From f288afe3e9990b3dc060299083a8a3ba307ae2eb Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 20 Feb 2014 14:08:01 +0000
Subject: [PATCH] OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR
---
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java | 268 ++++++++
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 252 +++----
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 72 -
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java | 2
opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties | 2
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java | 13
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 69 +-
opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties | 3
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java | 87 +-
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java | 532 ++++++++++++-----
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java | 16
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java | 41
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 6
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java | 8
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 4
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 2
opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties | 2
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 355 +++++------
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java | 2
opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties | 2
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java | 5
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 2
22 files changed, 1,077 insertions(+), 668 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
index 9f3583f..f2d3447 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
@@ -347,7 +347,7 @@
Bad msg id sequence during import. Expected:%s Actual:%s
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=\
The following servers did not acknowledge initialization in the expected \
- time. They are potentially down or too slow. Servers list: %s
+ time for domain %s. They are potentially down or too slow. Servers list: %s
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=\
The following servers did not end initialization being connected with the \
right generation (%s). They are potentially stopped or too slow. \
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties
index d7fa9b5..b82de97 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_de.properties
@@ -123,6 +123,7 @@
ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND_110=Die Konfiguration des Replikationsservers konnte nicht gefunden werden
DEBUG_GOING_TO_SEARCH_FOR_CHANGES_111=Der Replikationsserver ist hinsichtlich unserer \u00c4nderungen versp\u00e4tet: fehlende \u00c4nderungen werden gesendet
DEBUG_CHANGES_SENT_113=Alle fehlenden \u00c4nderungen wurden an den Replikationsserver gesendet
+<<<<<<< .working
ERR_PUBLISHING_FAKE_OPS_114=Aufgefangene Ausnahme ver\u00f6ffentlicht Scheinvorg\u00e4nge f\u00fcr Dom\u00e4ne %s : %s
ERR_COMPUTING_FAKE_OPS_115=Aufgefangene Ausnahme berechnet Scheinvorg\u00e4nge f\u00fcr Dom\u00e4ne %s f\u00fcr Replikationsserver %s : %s
NOTE_SERVER_STATE_RECOVERY_117=ServerState-Wiederherstellung f\u00fcr Dom\u00e4ne %s, aktualisiert mit changeNumber %s
@@ -182,7 +183,7 @@
ERR_INIT_IMPORT_FAILURE_190=W\u00e4hrend der Initialisierung von einem Remote-Server ist der folgende Fehler aufgetreten: %s
ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=Verbindungsfehler mit Replikationsserver %s w\u00e4hrend Import
ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Ung\u00fcltige Meldungs-ID-Sequenz w\u00e4hrend Import. Erwartet: %s Tats\u00e4chlich: %s
-ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Die folgenden Server haben die Initialisierung nicht in der erwarteten Zeit best\u00e4tigt. Sie sind potenziell nicht verf\u00fcgbar oder zu langsam. Server-Liste: %s
+ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Die folgenden Server haben die Initialisierung nicht in der erwarteten Zeit best\u00e4tigt f\u00fcr Dom\u00e4ne %s. Sie sind potenziell nicht verf\u00fcgbar oder zu langsam. Server-Liste: %s
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Die folgenden Server haben die Initialisierung nicht in Verbindung mit der entsprechenden Generation (%s) beendet. Sie wurden gestoppt oder sind zu langsam. Server-Liste: %s
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=Verbindung zu Replikationsserver mit Server-ID=%s w\u00e4hrend Initialisierung von Remote-Server(n) unterbrochen
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Initialisierter Server mit Server-ID=%s bei Initialisierung von Remote-Server(n) gestoppt oder zu langsam
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties
index c8c56cf..ee5506d 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_es.properties
@@ -182,7 +182,7 @@
ERR_INIT_IMPORT_FAILURE_190=Durante la inicializaci\u00f3n desde un servidor remoto, se ha producido el siguiente error: %s
ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=Error de conexi\u00f3n con el Servidor de repetici\u00f3n %s durante la importaci\u00f3n
ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Secuencia de Id. de mensaje incorrecto durante la importaci\u00f3n. Se requiere:%s Real:%s
-ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Los siguientes servidores no han confirmado su inicializaci\u00f3n en el tiempo esperado. Posiblemente est\u00e1n apagados o son demasiado lentos. Lista de servidores: %s
+ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Los siguientes servidores no han confirmado su inicializaci\u00f3n en el tiempo esperado para el ND de base %s. Posiblemente est\u00e1n apagados o son demasiado lentos. Lista de servidores: %s
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Los siguientes servidores no finalizaron la inicializaci\u00f3n estando conectados con la generaci\u00f3n correcta (%s). Posiblemente est\u00e1n detenidos o son demasiado lentos. Lista de servidores: %s
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=Al inicializar los servidores remotos, se ha perdido la conexi\u00f3n con el Servidor de repetici\u00f3n con Id. de servidor=%s
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Al inicializar los servidores remotos, el servidor inicializado con Id. de servidor=%s posiblemente se ha detenido o es demasiado lento
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties
index c3f8c6e..147fba0 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication_fr.properties
@@ -182,7 +182,7 @@
ERR_INIT_IMPORT_FAILURE_190=L'erreur suivante s'est produite lors de l'initialisation \u00e0 partir d'un serveur distant : %s
ERR_INIT_RS_DISCONNECTION_DURING_IMPORT_191=\u00c9chec de la connexion au serveur de r\u00e9plication %s lors de l'importation
ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT_192=Mauvaise s\u00e9quence d'ID de message lors de l'importation. Attendu\u00a0: %s Re\u00e7u\u00a0: %s
-ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Les serveurs suivants n'ont pas reconnu l'initialisation dans le d\u00e9lai pr\u00e9vu. Ils sont probablement en panne ou trop lents. Liste des serveurs\u00a0: %s
+ERR_INIT_NO_SUCCESS_START_FROM_SERVERS_193=Les serveurs suivants n'ont pas reconnu l'initialisation dans le d\u00e9lai pr\u00e9vu pour le domaine %s. Ils sont probablement en panne ou trop lents. Liste des serveurs\u00a0: %s
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS_194=Les serveurs suivants n'ont pas termin\u00e9 l'initialisation en cours de connexion avec la bonne g\u00e9n\u00e9ration (%s). Ils sont probablement arr\u00eat\u00e9s ou trop lents. Liste des serveurs\u00a0: %s
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT_195=La connexion au serveur de r\u00e9plication ayant l'identifiant serverId=%s a \u00e9t\u00e9 interrompue lors de l'initalisation du/des serveur(s) distant(s)
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT_196=Le serveur initialis\u00e9 ayant l'identifiant serverId=%s \u00e9tait probablement arr\u00eat\u00e9 ou trop lent lors de l'initialisation du/des serveur(s) distant(s)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
index fb42a43..bb062fa 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
@@ -283,9 +283,9 @@
&& equals(eclIncludesForDeletes, dsInfo.getEclIncludesForDeletes());
}
- private boolean equals(Set<String> o1, Set<String> o2)
+ private boolean equals(Object o1, Object o2)
{
- return (o1 == null && o2 == null) || (o1 != null && o1.equals(o2));
+ return o1 == null ? o2 == null : o1.equals(o2);
}
/**
@@ -320,15 +320,18 @@
@Override
public String toString()
{
- StringBuilder sb = new StringBuilder();
+ final StringBuilder sb = new StringBuilder();
sb.append("DS id: ").append(dsId);
sb.append(" ; DS url: ").append(dsUrl);
sb.append(" ; RS id: ").append(rsId);
sb.append(" ; Generation id: ").append(generationId);
sb.append(" ; Status: ").append(status);
sb.append(" ; Assured replication: ").append(assuredFlag);
- sb.append(" ; Assured mode: ").append(assuredMode);
- sb.append(" ; Safe data level: ").append(safeDataLevel);
+ if (assuredFlag)
+ {
+ sb.append(" ; Assured mode: ").append(assuredMode);
+ sb.append(" ; Safe data level: ").append(safeDataLevel);
+ }
sb.append(" ; Group id: ").append(groupId);
sb.append(" ; Protocol version: ").append(protocolVersion);
sb.append(" ; Referral URLs: ").append(refUrls);
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java
index c1aeea7..a101dd6 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/common/RSInfo.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2012-2013 ForgeRock AS
+ * Portions Copyright 2012-2014 ForgeRock AS
*/
package org.opends.server.replication.common;
@@ -36,7 +36,7 @@
public final class RSInfo
{
/** Server id of the RS. */
- private final int id;
+ private final int rsServerId;
/** Generation Id of the RS. */
private final long generationId;
/** Group id of the RS. */
@@ -50,22 +50,22 @@
*/
private final int weight;
/** The server URL of the RS. */
- private final String serverUrl;
+ private final String rsServerURL;
/**
* Creates a new instance of RSInfo with every given info.
*
- * @param id The RS id
- * @param serverUrl Url of the RS
+ * @param rsServerId The RS id
+ * @param rsServerURL Url of the RS
* @param generationId The generation id the RS is using
* @param groupId RS group id
* @param weight RS weight
*/
- public RSInfo(int id, String serverUrl,
+ public RSInfo(int rsServerId, String rsServerURL,
long generationId, byte groupId, int weight)
{
- this.id = id;
- this.serverUrl = serverUrl;
+ this.rsServerId = rsServerId;
+ this.rsServerURL = rsServerURL;
this.generationId = generationId;
this.groupId = groupId;
this.weight = weight;
@@ -77,7 +77,7 @@
*/
public int getId()
{
- return id;
+ return rsServerId;
}
/**
@@ -125,12 +125,10 @@
return false;
}
final RSInfo rsInfo = (RSInfo) obj;
- return id == rsInfo.getId()
+ return rsServerId == rsInfo.getId()
&& generationId == rsInfo.getGenerationId()
&& groupId == rsInfo.getGroupId()
- && weight == rsInfo.getWeight()
- && ((serverUrl == null && rsInfo.getServerUrl() == null)
- || (serverUrl != null && serverUrl.equals(rsInfo.getServerUrl())));
+ && weight == rsInfo.getWeight();
}
/**
@@ -141,11 +139,10 @@
public int hashCode()
{
int hash = 7;
- hash = 17 * hash + this.id;
+ hash = 17 * hash + this.rsServerId;
hash = 17 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
hash = 17 * hash + this.groupId;
hash = 17 * hash + this.weight;
- hash = 17 * hash + (this.serverUrl != null ? this.serverUrl.hashCode() : 0);
return hash;
}
@@ -155,7 +152,7 @@
*/
public String getServerUrl()
{
- return serverUrl;
+ return rsServerURL;
}
/**
@@ -165,12 +162,10 @@
@Override
public String toString()
{
- StringBuilder sb = new StringBuilder();
- sb.append("Id: ").append(id);
- sb.append(" ; Server URL: ").append(serverUrl);
- sb.append(" ; Generation id: ").append(generationId);
- sb.append(" ; Group id: ").append(groupId);
- sb.append(" ; Weight: ").append(weight);
- return sb.toString();
+ return "RS id: " + rsServerId
+ + " ; RS URL: " + rsServerURL
+ + " ; Generation id: " + generationId
+ + " ; Group id: " + groupId
+ + " ; Weight: " + weight;
}
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index d9e2e47..72b73d7 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -38,15 +38,14 @@
import org.opends.server.replication.common.ServerStatus;
/**
- *
* This class defines a message that is sent:
* - By a RS to the other RSs in the topology, containing:
- * - the list of DSs directly connected to the RS in the DS list
- * - only this RS in the RS list
+ * - the DSs directly connected to the RS in the DS infos
+ * - only this RS in the RS infos
* - By a RS to his connected DSs, containing every DSs and RSs he knows.
* In that case the message contains:
- * - the list of every DS the RS knows except the destinator DS in the DS list
- * - the list of every connected RSs (including the sending RS) in the RS list
+ * - every DSs the RS knows except the destinator DS in the DS infos
+ * - every connected RSs (including the sending RS) in the RS infos
*
* Exchanging these messages allows to have each RS or DS take
* appropriate decisions according to the current topology:
@@ -56,10 +55,10 @@
*/
public class TopologyMsg extends ReplicationMsg
{
- // Information for the DS known in the topology
- private final List<DSInfo> dsList;
- // Information for the RS known in the topology
- private final List<RSInfo> rsList;
+ /** Information for the DSs (aka replicas) known in the topology. */
+ private final Map<Integer, DSInfo> replicaInfos;
+ /** Information for the RSs known in the topology. */
+ private final List<RSInfo> rsInfos;
/**
* Creates a new changelogInfo message from its encoded form.
@@ -77,18 +76,18 @@
if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
{
throw new DataFormatException(
- "Input is not a valid " + this.getClass().getCanonicalName());
+ "Input is not a valid " + getClass().getCanonicalName());
}
int pos = 1;
/* Read number of following DS info entries */
-
byte nDsInfo = in[pos++];
/* Read the DS info entries */
- List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
- while ( (nDsInfo > 0) && (pos < in.length) )
+ Map<Integer, DSInfo> replicaInfos =
+ new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
+ while (nDsInfo > 0 && pos < in.length)
{
/* Read DS id */
int length = getNextLength(in, pos);
@@ -110,26 +109,21 @@
}
/* Read RS id */
- length =
- getNextLength(in, pos);
- serverIdString =
- new String(in, pos, length, "UTF-8");
+ length = getNextLength(in, pos);
+ serverIdString = new String(in, pos, length, "UTF-8");
int rsId = Integer.valueOf(serverIdString);
pos += length + 1;
/* Read the generation id */
length = getNextLength(in, pos);
- long generationId =
- Long.valueOf(new String(in, pos, length,
- "UTF-8"));
+ long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
pos += length + 1;
/* Read DS status */
ServerStatus status = ServerStatus.valueOf(in[pos++]);
/* Read DS assured flag */
- boolean assuredFlag;
- assuredFlag = in[pos++] == 1;
+ boolean assuredFlag = in[pos++] == 1;
/* Read DS assured mode */
AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
@@ -142,50 +136,18 @@
/* Read number of referrals URLs */
List<String> refUrls = new ArrayList<String>();
- byte nUrls = in[pos++];
- byte nRead = 0;
- /* Read urls until expected number read */
- while ((nRead != nUrls) &&
- (pos < in.length) //security
- )
- {
- length = getNextLength(in, pos);
- String url = new String(in, pos, length, "UTF-8");
- refUrls.add(url);
- pos += length + 1;
- nRead++;
- }
+ pos = readStrings(in, pos, refUrls);
Set<String> attrs = new HashSet<String>();
Set<String> delattrs = new HashSet<String>();
short protocolVersion = -1;
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- byte nAttrs = in[pos++];
- nRead = 0;
- /* Read attrs until expected number read */
- while ((nRead != nAttrs) && (pos < in.length))
- {
- length = getNextLength(in, pos);
- String attr = new String(in, pos, length, "UTF-8");
- attrs.add(attr);
- pos += length + 1;
- nRead++;
- }
+ pos = readStrings(in, pos, attrs);
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
{
- nAttrs = in[pos++];
- nRead = 0;
- /* Read attrs until expected number read */
- while ((nRead != nAttrs) && (pos < in.length))
- {
- length = getNextLength(in, pos);
- String attr = new String(in, pos, length, "UTF-8");
- delattrs.add(attr);
- pos += length + 1;
- nRead++;
- }
+ pos = readStrings(in, pos, delattrs);
}
else
{
@@ -194,26 +156,23 @@
}
/* Read Protocol version */
- protocolVersion = (short)in[pos++];
+ protocolVersion = in[pos++];
}
- /* Now create DSInfo and store it in list */
-
- DSInfo dsInfo = new DSInfo(dsId, dsUrl, rsId, generationId, status,
- assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
- delattrs, protocolVersion);
- dsList.add(dsInfo);
+ /* Now create DSInfo and store it */
+ replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
+ status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
+ attrs, delattrs, protocolVersion));
nDsInfo--;
}
/* Read number of following RS info entries */
-
byte nRsInfo = in[pos++];
/* Read the RS info entries */
- List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
- while ( (nRsInfo > 0) && (pos < in.length) )
+ List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+ while (nRsInfo > 0 && pos < in.length)
{
/* Read RS id */
int length = getNextLength(in, pos);
@@ -223,9 +182,7 @@
/* Read the generation id */
length = getNextLength(in, pos);
- long generationId =
- Long.valueOf(new String(in, pos, length,
- "UTF-8"));
+ long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
pos += length + 1;
/* Read RS group id */
@@ -245,47 +202,67 @@
pos += length + 1;
}
- /* Now create RSInfo and store it in list */
-
- RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
- weight);
- rsList.add(rsInfo);
+ /* Now create RSInfo and store it */
+ rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
nRsInfo--;
}
- this.dsList = Collections.unmodifiableList(dsList);
- this.rsList = Collections.unmodifiableList(rsList);
- } catch (UnsupportedEncodingException e)
+ this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
+ this.rsInfos = Collections.unmodifiableList(rsInfos);
+ }
+ catch (UnsupportedEncodingException e)
{
throw new DataFormatException("UTF-8 is not supported by this jvm.");
}
}
- /**
- * Creates a new message from a list of the currently connected servers.
- *
- * @param dsList The list of currently connected DS servers ID.
- * @param rsList The list of currently connected RS servers ID.
- */
- public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
+ private int readStrings(byte[] in, int pos, Collection<String> outputCol)
+ throws DataFormatException, UnsupportedEncodingException
{
- if (dsList == null || dsList.isEmpty())
+ byte nAttrs = in[pos++];
+ byte nRead = 0;
+ // Read all elements until expected number read
+ while (nRead != nAttrs && pos < in.length)
{
- this.dsList = Collections.emptyList();
+ int length = getNextLength(in, pos);
+ outputCol.add(new String(in, pos, length, "UTF-8"));
+ pos += length + 1;
+ nRead++;
+ }
+ return pos;
+ }
+
+ /**
+ * Creates a new message of the currently connected servers.
+ *
+ * @param dsInfos The collection of currently connected DS servers ID.
+ * @param rsInfos The list of currently connected RS servers ID.
+ */
+ public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos)
+ {
+ if (dsInfos == null || dsInfos.isEmpty())
+ {
+ this.replicaInfos = Collections.emptyMap();
}
else
{
- this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
+ Map<Integer, DSInfo> replicas = new HashMap<Integer, DSInfo>();
+ for (DSInfo dsInfo : dsInfos)
+ {
+ replicas.put(dsInfo.getDsId(), dsInfo);
+ }
+ this.replicaInfos = Collections.unmodifiableMap(replicas);
}
- if (rsList == null || rsList.isEmpty())
+ if (rsInfos == null || rsInfos.isEmpty())
{
- this.rsList = Collections.emptyList();
+ this.rsInfos = Collections.emptyList();
}
else
{
- this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
+ this.rsInfos =
+ Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos));
}
}
@@ -293,12 +270,9 @@
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- public byte[] getBytes(short version)
- throws UnsupportedEncodingException
+ public byte[] getBytes(short version) throws UnsupportedEncodingException
{
try
{
@@ -313,10 +287,10 @@
oStream.write(MSG_TYPE_TOPOLOGY);
// Put number of following DS info entries
- oStream.write((byte)dsList.size());
+ oStream.write((byte) replicaInfos.size());
// Put DS info
- for (DSInfo dsInfo : dsList)
+ for (DSInfo dsInfo : replicaInfos.values())
{
// Put DS id
byte[] byteServerId =
@@ -330,8 +304,7 @@
oStream.write(0);
}
// Put RS id
- byteServerId =
- String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
+ byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
oStream.write(byteServerId);
oStream.write(0);
// Put the generation id
@@ -349,36 +322,16 @@
// Put DS group id
oStream.write(dsInfo.getGroupId());
- List<String> refUrls = dsInfo.getRefUrls();
- // Put number of following URLs as a byte
- oStream.write(refUrls.size());
- for (String url : refUrls)
- {
- // Write the url and a 0 terminating byte
- oStream.write(url.getBytes("UTF-8"));
- oStream.write(0);
- }
+ writeStrings(oStream, dsInfo.getRefUrls());
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// Put ECL includes
- Set<String> attrs = dsInfo.getEclIncludes();
- oStream.write(attrs.size());
- for (String attr : attrs)
- {
- oStream.write(attr.getBytes("UTF-8"));
- oStream.write(0);
- }
+ writeStrings(oStream, dsInfo.getEclIncludes());
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
{
- Set<String> delattrs = dsInfo.getEclIncludesForDeletes();
- oStream.write(delattrs.size());
- for (String attr : delattrs)
- {
- oStream.write(attr.getBytes("UTF-8"));
- oStream.write(0);
- }
+ writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
}
oStream.write(dsInfo.getProtocolVersion());
@@ -386,10 +339,10 @@
}
// Put number of following RS info entries
- oStream.write((byte)rsList.size());
+ oStream.write((byte) rsInfos.size());
// Put RS info
- for (RSInfo rsInfo : rsList)
+ for (RSInfo rsInfo : rsInfos)
{
// Put RS id
byte[] byteServerId =
@@ -422,54 +375,65 @@
// never happens
throw new RuntimeException(e);
}
-
}
+ private void writeStrings(ByteArrayOutputStream oStream,
+ Collection<String> col) throws IOException, UnsupportedEncodingException
+ {
+ // Put collection length as a byte
+ oStream.write(col.size());
+ for (String elem : col)
+ {
+ // Write the element and a 0 terminating byte
+ oStream.write(elem.getBytes("UTF-8"));
+ oStream.write(0);
+ }
+ }
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
String dsStr = "";
- for (DSInfo dsInfo : dsList)
+ for (DSInfo dsInfo : replicaInfos.values())
{
- dsStr += dsInfo.toString() + "\n----------------------------\n";
+ dsStr += dsInfo + "\n----------------------------\n";
}
String rsStr = "";
- for (RSInfo rsInfo : rsList)
+ for (RSInfo rsInfo : rsInfos)
{
- rsStr += rsInfo.toString() + "\n----------------------------\n";
+ rsStr += rsInfo + "\n----------------------------\n";
}
- return ("TopologyMsg content: "
+ return "TopologyMsg content:"
+ "\n----------------------------"
+ "\nCONNECTED DS SERVERS:"
+ "\n--------------------\n"
+ dsStr
+ "CONNECTED RS SERVERS:"
+ "\n--------------------\n"
- + rsStr + (rsStr.equals("") ? "----------------------------\n" : ""));
+ + rsStr
+ + (rsStr.equals("") ? "----------------------------\n" : "");
}
/**
- * Get the list of DS info.
- * @return The list of DS info
+ * Get the DS infos.
+ *
+ * @return The DS infos
*/
- public List<DSInfo> getDsList()
+ public Map<Integer, DSInfo> getReplicaInfos()
{
- return dsList;
+ return replicaInfos;
}
/**
- * Get the list of RS info.
- * @return The list of RS info
+ * Get the RS infos.
+ *
+ * @return The RS infos
*/
- public List<RSInfo> getRsList()
+ public List<RSInfo> getRsInfos()
{
- return rsList;
+ return rsInfos;
}
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 6f0d924..4928623 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -444,7 +444,7 @@
if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// List should only contain RS info for sender
- RSInfo rsInfo = inTopoMsg.getRsList().get(0);
+ RSInfo rsInfo = inTopoMsg.getRsInfos().get(0);
weight = rsInfo.getWeight();
}
@@ -579,7 +579,7 @@
public void processTopoInfoFromRS(TopologyMsg topoMsg)
{
// List should only contain RS info for sender
- final RSInfo rsInfo = topoMsg.getRsList().get(0);
+ final RSInfo rsInfo = topoMsg.getRsInfos().get(0);
generationId = rsInfo.getGenerationId();
groupId = rsInfo.getGroupId();
weight = rsInfo.getWeight();
@@ -589,7 +589,7 @@
clearRemoteLSHandlers();
// Creates the new structure according to the message received.
- for (DSInfo dsInfo : topoMsg.getDsList())
+ for (DSInfo dsInfo : topoMsg.getReplicaInfos().values())
{
// For each DS connected to the peer RS
DSInfo clonedDSInfo = dsInfo.cloneWithReplicationServerId(serverId);
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c5f305f..d9e6fca 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -173,11 +173,7 @@
// @NotNull // for the reference
private final AtomicReference<ConnectedRS> connectedRS =
new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
- /**
- * Our replication domain.
- * <p>
- * Can be null for unit test purpose.
- */
+ /** Our replication domain. */
private ReplicationDomain domain;
/**
* This object is used as a conditional event to be notified about
@@ -217,25 +213,14 @@
/*
* Properties for the last topology info received from the network.
*/
- /**
- * Info for other DSs.
- * <p>
- * Warning: does not contain info for us (for our server id)
- */
- private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
- private volatile long generationID;
+ /** Contains the last known state of the replication topology. */
+ private final AtomicReference<Topology> topology =
+ new AtomicReference<Topology>(new Topology());
+ /** <pre>@GuardedBy("this")</pre>. */
private volatile int updateDoneCount = 0;
private volatile boolean connectRequiresRecovery = false;
/**
- * The map of replication server info initialized at connection time and
- * regularly updated. This is used to decide to which best suitable
- * replication server one wants to connect. Key: replication server id Value:
- * replication server info for the matching replication server id
- */
- private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
-
- /**
* This integer defines when the best replication server checking algorithm
* should be engaged.
* Every time a monitoring message (each monitoring publisher period) is
@@ -266,19 +251,16 @@
* @param state The ServerState that should be used by this broker
* when negotiating the session with the replicationServer.
* @param config The configuration to use.
- * @param generationId The generationId for the server associated to the
- * provided serverId and for the domain associated to the provided baseDN.
* @param replSessionSecurity The session security configuration.
*/
public ReplicationBroker(ReplicationDomain replicationDomain,
- ServerState state, ReplicationDomainCfg config, long generationId,
+ ServerState state, ReplicationDomainCfg config,
ReplSessionSecurity replSessionSecurity)
{
this.domain = replicationDomain;
this.state = state;
this.config = config;
this.replSessionSecurity = replSessionSecurity;
- this.generationID = generationId;
this.rcvWindow = getMaxRcvWindow();
this.halfRcvWindow = rcvWindow / 2;
@@ -352,8 +334,7 @@
*/
private long getGenerationID()
{
- generationID = domain.getGenerationID();
- return generationID;
+ return domain.getGenerationID();
}
/**
@@ -362,38 +343,7 @@
*/
public void setGenerationID(long generationID)
{
- this.generationID = generationID;
- }
-
- /**
- * Sets the locally configured flag for the passed ReplicationServerInfo
- * object, analyzing the local configuration.
- * @param rsInfo the Replication server to check and update
- */
- private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo)
- {
- // Determine if the passed ReplicationServerInfo has a URL that is present
- // in the locally configured replication servers
- String rsUrl = rsInfo.getServerURL();
- if (rsUrl == null)
- {
- // The ReplicationServerInfo has been generated from a server with
- // no URL in TopologyMsg (i.e: with replication protocol version < 4):
- // ignore this server as we do not know how to connect to it
- rsInfo.setLocallyConfigured(false);
- return;
- }
- for (String serverUrl : getReplicationServerUrls())
- {
- if (isSameReplicationServerUrl(serverUrl, rsUrl))
- {
- // This RS is locally configured, mark this
- rsInfo.setLocallyConfigured(true);
- rsInfo.setServerURL(serverUrl);
- return;
- }
- }
- rsInfo.setLocallyConfigured(false);
+ domain.setGenerationID(generationID);
}
/**
@@ -485,7 +435,7 @@
// Unsupported message type: should not happen
throw new IllegalArgumentException("Unexpected PDU type: "
- + msg.getClass().getName() + " :\n" + msg);
+ + msg.getClass().getName() + ":\n" + msg);
}
/**
@@ -733,8 +683,10 @@
@Override
public String toString()
{
- return "Url:" + getServerURL() + " ServerId:" + getServerId()
- + " GroupId:" + getGroupId();
+ return "ReplServerInfo Url:" + getServerURL()
+ + " ServerId:" + getServerId()
+ + " GroupId:" + getGroupId()
+ + " connectedDSs:" + connectedDSs;
}
}
@@ -860,9 +812,11 @@
+ "elect the preferred one");
// Get info from every available replication servers
- replicationServerInfos = collectReplicationServersInfo();
+ Map<Integer, ReplicationServerInfo> rsInfos =
+ collectReplicationServersInfo();
+ computeNewTopology(toRSInfos(rsInfos));
- if (replicationServerInfos.isEmpty())
+ if (rsInfos.isEmpty())
{
setConnectedRS(ConnectedRS.noConnectedRS());
}
@@ -870,7 +824,7 @@
{
// At least one server answered, find the best one.
RSEvaluations evals = computeBestReplicationServer(true, -1, state,
- replicationServerInfos, serverId, getGroupId(), getGenerationID());
+ rsInfos, serverId, getGroupId(), getGenerationID());
// Best found, now initialize connection to this one (handshake phase 1)
if (logger.isTraceEnabled())
@@ -886,8 +840,7 @@
Update replication server info with potentially more up to date
data (server state for instance may have changed)
*/
- replicationServerInfos
- .put(electedRsInfo.getServerId(), electedRsInfo);
+ rsInfos.put(electedRsInfo.getServerId(), electedRsInfo);
// Handshake phase 1 exchange went well
@@ -935,10 +888,10 @@
connectionError = true;
connectPhaseLock.notify();
- if (replicationServerInfos.size() > 0)
+ if (rsInfos.size() > 0)
{
logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN.toNormalizedString(),
- Utils.joinAsString(", ", replicationServerInfos.keySet()));
+ Utils.joinAsString(", ", rsInfos.keySet()));
}
else
{
@@ -949,6 +902,43 @@
}
}
+ private void computeNewTopology(List<RSInfo> newRSInfos)
+ {
+ final int rsServerId = getRsServerId();
+
+ Topology oldTopo;
+ Topology newTopo;
+ do
+ {
+ oldTopo = topology.get();
+ newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(),
+ rsServerId, getReplicationServerUrls(), oldTopo.rsInfos);
+ }
+ while (!topology.compareAndSet(oldTopo, newTopo));
+
+ if (logger.isTraceEnabled())
+ {
+ debugInfo(topologyChange(rsServerId, oldTopo, newTopo));
+ }
+ }
+
+ private StringBuilder topologyChange(int rsServerId, Topology oldTopo,
+ Topology newTopo)
+ {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("rsServerId=").append(rsServerId);
+ if (newTopo.equals(oldTopo))
+ {
+ sb.append(", unchangedTopology=").append(newTopo);
+ }
+ else
+ {
+ sb.append(", oldTopology=").append(oldTopo);
+ sb.append(", newTopology=").append(newTopo);
+ }
+ return sb;
+ }
+
/**
* Connects to a replication server.
*
@@ -2303,7 +2293,7 @@
if (logger.isTraceEnabled())
{
debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
- + rs.getServerId() + ") genId=" + generationID);
+ + rs.getServerId() + ") genId=" + getGenerationID());
}
}
@@ -2408,7 +2398,8 @@
*/
credit =
currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
- } else
+ }
+ else
{
credit = true;
}
@@ -2451,6 +2442,11 @@
}
catch (IOException e)
{
+ if (logger.isTraceEnabled())
+ {
+ debugInfo("publish(): IOException caught: "
+ + stackTraceToSingleLineString(e));
+ }
if (!retryOnFailure)
{
return false;
@@ -2463,23 +2459,24 @@
try
{
connectPhaseLock.wait(100);
- } catch (InterruptedException e1)
+ }
+ catch (InterruptedException ignored)
{
- // ignore
if (logger.isTraceEnabled())
{
- debugInfo("publish(): Interrupted exception raised : "
- + e.getLocalizedMessage());
+ debugInfo("publish(): InterruptedException caught 1: "
+ + stackTraceToSingleLineString(ignored));
}
}
}
- } catch (InterruptedException e)
+ }
+ catch (InterruptedException ignored)
{
// just loop.
if (logger.isTraceEnabled())
{
- debugInfo("publish(): Interrupted exception raised."
- + e.getLocalizedMessage());
+ debugInfo("publish(): InterruptedException caught 2: "
+ + stackTraceToSingleLineString(ignored));
}
}
}
@@ -2607,9 +2604,10 @@
}
// Update the replication servers ServerStates with new received info
+ Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos;
for (int srvId : toIterable(monitorMsg.rsIterator()))
{
- ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
+ final ReplicationServerInfo rsInfo = rsInfos.get(srvId);
if (rsInfo != null)
{
rsInfo.update(monitorMsg.getRSServerState(srvId));
@@ -2629,9 +2627,9 @@
{
// Stable topology (no topo msg since few seconds): proceed with
// best server checking.
- final RSEvaluations evals =
- computeBestReplicationServer(false, previousRsServerID, state,
- replicationServerInfos, serverId, getGroupId(), generationID);
+ final RSEvaluations evals = computeBestReplicationServer(
+ false, previousRsServerID, state,
+ rsInfos, serverId, getGroupId(), getGenerationID());
final ReplicationServerInfo bestServerInfo = evals.getBestRS();
if (previousRsServerID != -1
&& (bestServerInfo == null
@@ -2951,9 +2949,9 @@
* Gets the info for DSs in the topology (except us).
* @return The info for DSs in the topology (except us)
*/
- public List<DSInfo> getDsList()
+ public Map<Integer, DSInfo> getReplicaInfos()
{
- return dsList;
+ return topology.get().replicaInfos;
}
/**
@@ -2962,10 +2960,15 @@
* @return The info for RSs in the topology (except the one we are connected
* to)
*/
- public List<RSInfo> getRsList()
+ public List<RSInfo> getRsInfos()
+ {
+ return toRSInfos(topology.get().rsInfos);
+ }
+
+ private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos)
{
final List<RSInfo> result = new ArrayList<RSInfo>();
- for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
+ for (ReplicationServerInfo rsInfo : rsInfos.values())
{
result.add(rsInfo.toRSInfo());
}
@@ -2973,39 +2976,6 @@
}
/**
- * Computes the list of DSs connected to a particular RS.
- * @param rsId The RS id of the server one wants to know the connected DSs
- * @param dsList The list of DSinfo from which to compute things
- * @param rsServerId the serverId to use for the connectedDS
- * @return The list of connected DSs to the server rsId
- */
- private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList,
- int rsServerId)
- {
- final Set<Integer> connectedDSs = new HashSet<Integer>();
- if (rsServerId == rsId)
- {
- /*
- If we are computing connected DSs for the RS we are connected
- to, we should count the local DS as the DSInfo of the local DS is not
- sent by the replication server in the topology message. We must count
- ourselves as a connected server.
- */
- connectedDSs.add(getServerId());
- }
-
- for (DSInfo dsInfo : dsList)
- {
- if (dsInfo.getRsId() == rsId)
- {
- connectedDSs.add(dsInfo.getDsId());
- }
- }
-
- return connectedDSs;
- }
-
- /**
* Processes an incoming TopologyMsg.
* Updates the structures for the local view of the topology.
*
@@ -3016,42 +2986,298 @@
*/
private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
{
- if (logger.isTraceEnabled())
- debugInfo("receive TopologyMsg=" + topoMsg);
-
- // Store new DS list
- dsList = topoMsg.getDsList();
-
- // Update replication server info list with the received topology
- // information
- final Set<Integer> rssToKeep = new HashSet<Integer>();
- for (RSInfo rsInfo : topoMsg.getRsList())
+ final Topology newTopo = computeNewTopology(topoMsg, rsServerId);
+ for (DSInfo dsInfo : newTopo.replicaInfos.values())
{
- final int rsId = rsInfo.getId();
- rssToKeep.add(rsId); // Mark this server as still existing
- Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId);
- ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
- if (rsInfo2 == null)
- {
- // New replication server, create info for it add it to the list
- rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
- setLocallyConfiguredFlag(rsInfo2);
- replicationServerInfos.put(rsId, rsInfo2);
- }
- else
- {
- // Update the existing info for the replication server
- rsInfo2.update(rsInfo, connectedDSs);
- }
+ domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo
+ .getEclIncludesForDeletes());
+ }
+ }
+
+ private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId)
+ {
+ Topology oldTopo;
+ Topology newTopo;
+ do
+ {
+ oldTopo = topology.get();
+ newTopo = new Topology(topoMsg, getServerId(), rsServerId,
+ getReplicationServerUrls(), oldTopo.rsInfos);
+ }
+ while (!topology.compareAndSet(oldTopo, newTopo));
+
+ if (logger.isTraceEnabled())
+ {
+ final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo);
+ sb.append(" received TopologyMsg=").append(topoMsg);
+ debugInfo(sb);
+ }
+ return newTopo;
+ }
+
+ /**
+ * Contains the last known state of the replication topology.
+ */
+ static final class Topology
+ {
+
+ /**
+ * The RS's serverId that this DS was connected to when this topology state
+ * was computed.
+ */
+ private final int rsServerId;
+ /**
+ * Info for other DSs.
+ * <p>
+ * Warning: does not contain info for us (for our server id)
+ */
+ final Map<Integer, DSInfo> replicaInfos;
+ /**
+ * The map of replication server info initialized at connection time and
+ * regularly updated. This is used to decide to which best suitable
+ * replication server one wants to connect. Key: replication server id
+ * Value: replication server info for the matching replication server id
+ */
+ final Map<Integer, ReplicationServerInfo> rsInfos;
+
+ private Topology()
+ {
+ this.rsServerId = -1;
+ this.replicaInfos = Collections.emptyMap();
+ this.rsInfos = Collections.emptyMap();
}
- // Remove any replication server that may have disappeared from the topology
- replicationServerInfos.keySet().retainAll(rssToKeep);
-
- for (DSInfo info : dsList)
+ /**
+ * Constructor to use when only the RSInfos need to be recomputed.
+ *
+ * @param dsInfosToKeep
+ * the DSInfos that will be stored as is
+ * @param newRSInfos
+ * the new RSInfos from which to compute the new topology
+ * @param dsServerId
+ * the DS serverId
+ * @param rsServerId
+ * the current connected RS serverId
+ * @param configuredReplicationServerUrls
+ * the configured replication server URLs
+ * @param previousRsInfos
+ * the RSInfos computed in the previous Topology object
+ */
+ Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos,
+ int dsServerId, int rsServerId,
+ Set<String> configuredReplicationServerUrls,
+ Map<Integer, ReplicationServerInfo> previousRsInfos)
{
- domain.setEclIncludes(info.getDsId(), info.getEclIncludes(),
- info.getEclIncludesForDeletes());
+ this.rsServerId = rsServerId;
+ this.replicaInfos = dsInfosToKeep;
+ this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
+ previousRsInfos, configuredReplicationServerUrls);
+ }
+
+ /**
+ * Constructor to use when a new TopologyMsg has been received.
+ *
+ * @param topoMsg
+ * the topology message containing the new DSInfos and RSInfos from
+ * which to compute the new topology
+ * @param dsServerId
+ * the DS serverId
+ * @param rsServerId
+ * the current connected RS serverId
+ * @param configuredReplicationServerUrls
+ * the configured replication server URLs
+ * @param previousRsInfos
+ * the RSInfos computed in the previous Topology object
+ */
+ Topology(TopologyMsg topoMsg, int dsServerId,
+ int rsServerId, Set<String> configuredReplicationServerUrls,
+ Map<Integer, ReplicationServerInfo> previousRsInfos)
+ {
+ this.rsServerId = rsServerId;
+ this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId);
+ this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(),
+ previousRsInfos, configuredReplicationServerUrls);
+ }
+
+ private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos,
+ int dsServerId)
+ {
+ final Map<Integer, DSInfo> copy = new HashMap<Integer, DSInfo>(dsInfos);
+ copy.remove(dsServerId);
+ return Collections.unmodifiableMap(copy);
+ }
+
+ private Map<Integer, ReplicationServerInfo> computeRSInfos(
+ int dsServerId, List<RSInfo> newRsInfos,
+ Map<Integer, ReplicationServerInfo> previousRsInfos,
+ Set<String> configuredReplicationServerUrls)
+ {
+ final Map<Integer, ReplicationServerInfo> results =
+ new HashMap<Integer, ReplicationServerInfo>(previousRsInfos);
+
+ // Update replication server info list with the received topology info
+ final Set<Integer> rssToKeep = new HashSet<Integer>();
+ for (RSInfo newRSInfo : newRsInfos)
+ {
+ final int rsId = newRSInfo.getId();
+ rssToKeep.add(rsId); // Mark this server as still existing
+ Set<Integer> connectedDSs =
+ computeDSsConnectedTo(rsId, dsServerId);
+ ReplicationServerInfo rsInfo = results.get(rsId);
+ if (rsInfo == null)
+ {
+ // New replication server, create info for it add it to the list
+ rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs);
+ setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls);
+ results.put(rsId, rsInfo);
+ }
+ else
+ {
+ // Update the existing info for the replication server
+ rsInfo.update(newRSInfo, connectedDSs);
+ }
+ }
+
+ // Remove any replication server that may have disappeared from the
+ // topology
+ results.keySet().retainAll(rssToKeep);
+
+ return Collections.unmodifiableMap(results);
+ }
+
+ /** Computes the list of DSs connected to a particular RS. */
+ private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId)
+ {
+ final Set<Integer> connectedDSs = new HashSet<Integer>();
+ if (rsServerId == rsId)
+ {
+ /*
+ * If we are computing connected DSs for the RS we are connected to, we
+ * should count the local DS as the DSInfo of the local DS is not sent
+ * by the replication server in the topology message. We must count
+ * ourselves as a connected server.
+ */
+ connectedDSs.add(dsServerId);
+ }
+
+ for (DSInfo dsInfo : replicaInfos.values())
+ {
+ if (dsInfo.getRsId() == rsId)
+ {
+ connectedDSs.add(dsInfo.getDsId());
+ }
+ }
+
+ return connectedDSs;
+ }
+
+ /**
+ * Sets the locally configured flag for the passed ReplicationServerInfo
+ * object, analyzing the local configuration.
+ *
+ * @param rsInfo
+ * the Replication server to check and update
+ * @param configuredReplicationServerUrls
+ */
+ private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo,
+ Set<String> configuredReplicationServerUrls)
+ {
+ // Determine if the passed ReplicationServerInfo has a URL that is present
+ // in the locally configured replication servers
+ String rsUrl = rsInfo.getServerURL();
+ if (rsUrl == null)
+ {
+ // The ReplicationServerInfo has been generated from a server with
+ // no URL in TopologyMsg (i.e: with replication protocol version < 4):
+ // ignore this server as we do not know how to connect to it
+ rsInfo.setLocallyConfigured(false);
+ return;
+ }
+ for (String serverUrl : configuredReplicationServerUrls)
+ {
+ if (isSameReplicationServerUrl(serverUrl, rsUrl))
+ {
+ // This RS is locally configured, mark this
+ rsInfo.setLocallyConfigured(true);
+ rsInfo.setServerURL(serverUrl);
+ return;
+ }
+ }
+ rsInfo.setLocallyConfigured(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass())
+ {
+ return false;
+ }
+ final Topology other = (Topology) obj;
+ return rsServerId == other.rsServerId
+ && equals(replicaInfos, other.replicaInfos)
+ && equals(rsInfos, other.rsInfos)
+ && urlsEqual1(replicaInfos, other.replicaInfos)
+ && urlsEqual2(rsInfos, other.rsInfos);
+ }
+
+ private boolean equals(Object o1, Object o2)
+ {
+ return o1 == null ? o2 == null : o1.equals(o2);
+ }
+
+ private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1,
+ Map<Integer, DSInfo> replicaInfos2)
+ {
+ for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet())
+ {
+ DSInfo dsInfo = replicaInfos2.get(entry.getKey());
+ if (!equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl()))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1,
+ Map<Integer, ReplicationServerInfo> rsInfos2)
+ {
+ for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet())
+ {
+ ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey());
+ if (!equals(entry.getValue().getServerURL(), rsInfo.getServerURL()))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + rsServerId;
+ result = prime * result
+ + (replicaInfos == null ? 0 : replicaInfos.hashCode());
+ result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode());
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
+ + ", rsInfos=" + rsInfos.values();
}
}
@@ -3197,15 +3423,15 @@
.append(" \"").append(getBaseDN()).append(" ")
.append(getServerId()).append("\",")
.append(" groupId=").append(getGroupId())
- .append(", genId=").append(generationID)
+ .append(", genId=").append(getGenerationID())
.append(", ");
connectedRS.get().toString(sb);
return sb.toString();
}
- private void debugInfo(String message)
+ private void debugInfo(CharSequence message)
{
logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
- + " and serverId=" + getServerId() + " " + message);
+ + " and serverId=" + getServerId() + ": " + message);
}
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 60a2b4a..1323777 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -95,15 +95,118 @@
* <p>
* Full Initialization of a replica can be triggered by LDAP clients
* by creating InitializeTasks or InitializeTargetTask.
- * Full initialization can also by triggered from the ReplicationDomain
- * implementation using methods {@link #initializeRemote(int)}
- * or {@link #initializeFromRemote(int)}.
+ * Full initialization can also be triggered from the ReplicationDomain
+ * implementation using methods {@link #initializeRemote(int, Task)}
+ * or {@link #initializeFromRemote(int, Task)}.
* <p>
* At shutdown time, the {@link #disableService()} method should be called to
* cleanly stop the replication service.
*/
public abstract class ReplicationDomain
{
+
+ /**
+ * Contains all the attributes included for the ECL (External Changelog).
+ */
+ // @Immutable
+ private final static class ECLIncludes
+ {
+
+ final Map<Integer, Set<String>> includedAttrsByServer;
+ final Set<String> includedAttrsAllServers;
+
+ final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
+ final Set<String> includedAttrsForDeletesAllServers;
+
+ private ECLIncludes(
+ Map<Integer, Set<String>> includedAttrsByServer,
+ Set<String> includedAttrsAllServers,
+ Map<Integer, Set<String>> includedAttrsForDeletesByServer,
+ Set<String> includedAttrsForDeletesAllServers)
+ {
+ this.includedAttrsByServer = includedAttrsByServer;
+ this.includedAttrsAllServers = includedAttrsAllServers;
+
+ this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
+ this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
+ }
+
+ @SuppressWarnings("unchecked")
+ public ECLIncludes()
+ {
+ this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
+ Collections.EMPTY_SET);
+ }
+
+ /**
+ * Add attributes to be included in the ECL.
+ *
+ * @param serverId
+ * Server where these attributes are configured.
+ * @param includeAttributes
+ * Attributes to be included with all change records, may include
+ * wild-cards.
+ * @param includeAttributesForDeletes
+ * Additional attributes to be included with delete change records,
+ * may include wild-cards.
+ * @return a new {@link ECLIncludes} object if included attributes have
+ * changed, or the current object otherwise.
+ */
+ public ECLIncludes addIncludedAttributes(int serverId,
+ Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
+ {
+ boolean configurationChanged = false;
+
+ Set<String> s1 = new HashSet<String>(includeAttributes);
+
+ // Combine all+delete attributes.
+ Set<String> s2 = new HashSet<String>(s1);
+ s2.addAll(includeAttributesForDeletes);
+
+ Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
+ if (!s1.equals(this.includedAttrsByServer.get(serverId)))
+ {
+ configurationChanged = true;
+ eclIncludesByServer = new HashMap<Integer, Set<String>>(
+ this.includedAttrsByServer);
+ eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
+ }
+
+ Map<Integer, Set<String>> eclIncludesForDeletesByServer =
+ this.includedAttrsForDeletesByServer;
+ if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
+ {
+ configurationChanged = true;
+ eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>(
+ this.includedAttrsForDeletesByServer);
+ eclIncludesForDeletesByServer.put(
+ serverId, Collections.unmodifiableSet(s2));
+ }
+
+ if (!configurationChanged)
+ {
+ return this;
+ }
+
+ // and rebuild the global list to be ready for usage
+ Set<String> eclIncludesAllServer = new HashSet<String>();
+ for (Set<String> attributes : eclIncludesByServer.values())
+ {
+ eclIncludesAllServer.addAll(attributes);
+ }
+
+ Set<String> eclIncludesForDeletesAllServer = new HashSet<String>();
+ for (Set<String> attributes : eclIncludesForDeletesByServer.values())
+ {
+ eclIncludesForDeletesAllServer.addAll(attributes);
+ }
+ return new ECLIncludes(eclIncludesByServer,
+ Collections.unmodifiableSet(eclIncludesAllServer),
+ eclIncludesForDeletesByServer,
+ Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
+ }
+ }
+
/**
* Current status for this replicated domain.
*/
@@ -251,14 +354,8 @@
*/
private final CSNGenerator generator;
- private final Object eclIncludesLock = new Object();
- private final Map<Integer, Set<String>> eclIncludesByServer =
- new HashMap<Integer, Set<String>>();
- private Set<String> eclIncludesAllServers = Collections.emptySet();
-
- private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
- new HashMap<Integer, Set<String>>();
- private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
+ private final AtomicReference<ECLIncludes> eclIncludes =
+ new AtomicReference<ECLIncludes>(new ECLIncludes());
/**
* An object used to protect the initialization of the underlying broker
@@ -551,9 +648,9 @@
* Gets the info for Replicas in the topology (except us).
* @return The info for Replicas in the topology (except us)
*/
- public List<DSInfo> getReplicasList()
+ public Map<Integer, DSInfo> getReplicaInfos()
{
- return broker.getDsList();
+ return broker.getReplicaInfos();
}
/**
@@ -562,20 +659,13 @@
* disconnected. Return null when no server with the provided serverId is
* connected.
*
- * @param serverId The provided serverId of the remote replica
+ * @param dsId The provided serverId of the remote replica
* @return the info related to this remote server if it is connected,
* null is the server is NOT connected.
*/
- public DSInfo isRemoteDSConnected(int serverId)
+ private DSInfo isRemoteDSConnected(int dsId)
{
- for (DSInfo remoteDS : getReplicasList())
- {
- if (remoteDS.getDsId() == serverId)
- {
- return remoteDS;
- }
- }
- return null;
+ return getReplicaInfos().get(dsId);
}
/**
@@ -601,9 +691,9 @@
* @return The info for RSs in the topology (except the one we are connected
* to)
*/
- public List<RSInfo> getRsList()
+ public List<RSInfo> getRsInfos()
{
- return broker.getRsList();
+ return broker.getRsInfos();
}
@@ -1100,7 +1190,7 @@
* for and import, false if the IEContext
* will be used for and export.
*/
- public IEContext(boolean importInProgress)
+ private IEContext(boolean importInProgress)
{
this.importInProgress = importInProgress;
this.startTime = System.currentTimeMillis();
@@ -1114,7 +1204,7 @@
* @return A boolean indicating if a total update import is currently in
* Progress.
*/
- public boolean importInProgress()
+ boolean importInProgress()
{
return importInProgress;
}
@@ -1153,18 +1243,17 @@
entryCount = total;
entryLeftCount = total;
- if (initializeTask != null)
+ if (initializeTask instanceof InitializeTask)
{
- if (initializeTask instanceof InitializeTask)
- {
- ((InitializeTask)initializeTask).setTotal(entryCount);
- ((InitializeTask)initializeTask).setLeft(entryCount);
- }
- else if (initializeTask instanceof InitializeTargetTask)
- {
- ((InitializeTargetTask)initializeTask).setTotal(entryCount);
- ((InitializeTargetTask)initializeTask).setLeft(entryCount);
- }
+ final InitializeTask task = (InitializeTask) initializeTask;
+ task.setTotal(entryCount);
+ task.setLeft(entryCount);
+ }
+ else if (initializeTask instanceof InitializeTargetTask)
+ {
+ final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
+ task.setTotal(entryCount);
+ task.setLeft(entryCount);
}
}
@@ -1177,7 +1266,7 @@
*
* @throws DirectoryException if an error occurred.
*/
- public void updateCounters(int entriesDone) throws DirectoryException
+ private void updateCounters(int entriesDone) throws DirectoryException
{
entryLeftCount -= entriesDone;
@@ -1198,7 +1287,7 @@
@Override
public String toString()
{
- return "[ Entry count=" + this.entryCount +
+ return "[Entry count=" + this.entryCount +
", Entry left count=" + this.entryLeftCount + "]";
}
@@ -1258,7 +1347,7 @@
* @param serverId serverId of the acknowledger/receiver/importer server.
* @param numAck id of the message received.
*/
- public void setAckVal(int serverId, int numAck)
+ private void setAckVal(int serverId, int numAck)
{
if (logger.isTraceEnabled())
logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
@@ -1315,6 +1404,7 @@
if (target >= 0)
{
// FIXME Could we check now that it is a know server in the domain ?
+ // JNR: Yes please
}
return target;
}
@@ -1338,7 +1428,7 @@
*
* @param target The server-id of the server that should be initialized.
* The target can be discovered using the
- * {@link #getReplicasList()} method.
+ * {@link #getReplicaInfos()} method.
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
*
@@ -1386,13 +1476,10 @@
logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
countEntries(), getBaseDNString(), getServerId());
- for (DSInfo dsi : getReplicasList())
- {
- ieCtx.startList.add(dsi.getDsId());
- }
+ ieCtx.startList.addAll(getReplicaInfos().keySet());
// We manage the list of servers with which a flow control can be enabled
- for (DSInfo dsi : getReplicasList())
+ for (DSInfo dsi : getReplicaInfos().values())
{
if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
@@ -1408,7 +1495,7 @@
ieCtx.startList.add(serverToInitialize);
// We manage the list of servers with which a flow control can be enabled
- for (DSInfo dsi : getReplicasList())
+ for (DSInfo dsi : getReplicaInfos().values())
{
if (dsi.getDsId() == serverToInitialize &&
dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
@@ -1453,7 +1540,7 @@
{
throw new DirectoryException(
ResultCode.OTHER,
- ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
+ ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDNString(),
ieCtx.failureList));
}
@@ -1472,7 +1559,7 @@
if (logger.isTraceEnabled())
logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
- + " export ends with " + " connected=" + broker.isConnected()
+ + " export ends with connected=" + broker.isConnected()
+ " exportRootException=" + exportRootException);
if (exportRootException != null)
@@ -1592,7 +1679,7 @@
do
{
done = true;
- for (DSInfo dsi : getReplicasList())
+ for (DSInfo dsi : getReplicaInfos().values())
{
if (logger.isTraceEnabled())
logger.trace(
@@ -1650,10 +1737,7 @@
considered in the processing of sorting the successfully initialized
and the others
*/
- for (DSInfo dsi : getReplicasList())
- {
- replicasWeAreWaitingFor.add(dsi.getDsId());
- }
+ replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
boolean done;
do
@@ -1698,13 +1782,11 @@
done = false;
break;
}
- else
- {
- if (dsInfo.getGenerationId() == getGenerationID())
- { // and with the expected generationId
- // We're done with this server
- it.remove();
- }
+
+ if (dsInfo.getGenerationId() == getGenerationID())
+ { // and with the expected generationId
+ // We're done with this server
+ it.remove();
}
}
}
@@ -1717,7 +1799,6 @@
Thread.currentThread().interrupt();
} // 1sec
}
-
}
while (!done && !broker.shuttingDown()); // infinite wait
@@ -1967,8 +2048,8 @@
*
* @throws IOException when an error occurred.
*/
- public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
- throws IOException
+ void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
+ throws IOException
{
if (logger.isTraceEnabled())
logger.trace("[IE] Entering exportLDIFEntry entry=" +
@@ -2072,53 +2153,6 @@
}
/**
- * Initializes this domain from another source server.
- * <p>
- * When this method is called, a request for initialization will
- * be sent to the source server asking for initialization.
- * <p>
- * The {@code exportBackend(OutputStream)} will therefore be called
- * on the source server, and the {@code importBackend(InputStream)}
- * will be called on his server.
- * <p>
- * The InputStream and OutpuStream given as a parameter to those
- * methods will be connected through the replication protocol.
- *
- * @param source The server-id of the source from which to initialize.
- * The source can be discovered using the
- * {@link #getReplicasList()} method.
- *
- * @throws DirectoryException If it was not possible to publish the
- * Initialization message to the Topology.
- */
- public void initializeFromRemote(int source) throws DirectoryException
- {
- initializeFromRemote(source, null);
- }
-
- /**
- * Initializes a remote server from this server.
- * <p>
- * The {@code exportBackend(OutputStream)} will therefore be called
- * on this server, and the {@code importBackend(InputStream)}
- * will be called on the remote server.
- * <p>
- * The InputStream and OutpuStream given as a parameter to those
- * methods will be connected through the replication protocol.
- *
- * @param target The server-id of the server that should be initialized.
- * The target can be discovered using the
- * {@link #getReplicasList()} method.
- *
- * @throws DirectoryException If it was not possible to publish the
- * Initialization message to the Topology.
- */
- public void initializeRemote(int target) throws DirectoryException
- {
- initializeRemote(target, null);
- }
-
- /**
* Initializes asynchronously this domain from a remote source server.
* Before returning from this call, for the provided task :
* - the progressing counters are updated during the initialization using
@@ -2131,7 +2165,7 @@
*
* @param source The server-id of the source from which to initialize.
* The source can be discovered using the
- * {@link #getReplicasList()} method.
+ * {@link #getReplicaInfos()} method.
*
* @param initTask The task that launched the initialization
* and should be updated of its progress.
@@ -2217,7 +2251,7 @@
* task has initially been created (this server,
* or the remote server).
*/
- void initialize(InitializeTargetMsg initTargetMsgReceived,
+ private void initialize(InitializeTargetMsg initTargetMsgReceived,
int requesterServerId)
{
InitializeTask initFromTask = null;
@@ -2254,7 +2288,6 @@
ieCtx.importSource = source;
ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
- // Protocol version is -1 when not known.
ieCtx.exporterProtocolVersion = getProtocolVersion(source);
initFromTask = (InitializeTask) ieCtx.initializeTask;
@@ -2382,18 +2415,14 @@
* @param dsServerId The provided serverId.
* @return The protocol version.
*/
- short getProtocolVersion(int dsServerId)
+ private short getProtocolVersion(int dsServerId)
{
- short protocolVersion = -1;
- for (DSInfo dsi : getReplicasList())
+ final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
+ if (dsInfo != null)
{
- if (dsi.getDsId() == dsServerId)
- {
- protocolVersion = dsi.getProtocolVersion();
- break;
- }
+ return dsInfo.getProtocolVersion();
}
- return protocolVersion;
+ return -1;
}
/**
@@ -2459,7 +2488,7 @@
for (int i = 0; i< 50; i++)
{
allSet = true;
- for (RSInfo rsInfo : getRsList())
+ for (RSInfo rsInfo : getRsInfos())
{
// the 'empty' RSes (generationId==-1) are considered as good citizens
if (rsInfo.getGenerationId() != -1 &&
@@ -2498,7 +2527,7 @@
* connected to a Replication Server or it
* was not possible to contact it.
*/
- public void resetReplicationLog() throws DirectoryException
+ void resetReplicationLog() throws DirectoryException
{
// Reset the Generation ID to -1 to clean the ReplicationServers.
resetGenerationId(-1L);
@@ -2913,7 +2942,7 @@
{
// create the broker object used to publish and receive changes
broker = new ReplicationBroker(
- this, state, config, getGenerationID(), new ReplSessionSecurity());
+ this, state, config, new ReplSessionSecurity());
broker.start();
}
}
@@ -3067,8 +3096,8 @@
}
/**
- * Applies a configuration change to the attributes which should be be
- * included in the ECL.
+ * Applies a configuration change to the attributes which should be included
+ * in the ECL.
*
* @param includeAttributes
* attributes to be included with all change records.
@@ -3385,7 +3414,7 @@
* @param msg The byte array containing the information that should
* be sent to the remote entities.
*/
- public void publish(byte[] msg)
+ void publish(byte[] msg)
{
UpdateMsg update;
synchronized (this)
@@ -3489,46 +3518,16 @@
Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
{
- boolean configurationChanged = false;
-
- synchronized (eclIncludesLock)
+ ECLIncludes current;
+ ECLIncludes updated;
+ do
{
- Set<String> s1 = new HashSet<String>(includeAttributes);
-
- // Combine all+delete attributes.
- Set<String> s2 = new HashSet<String>(s1);
- s2.addAll(includeAttributesForDeletes);
-
- if (!s1.equals(eclIncludesByServer.get(serverId)))
- {
- configurationChanged = true;
- eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
- }
-
- if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
- {
- configurationChanged = true;
- eclIncludesForDeletesByServer.put(serverId,
- Collections.unmodifiableSet(s2));
- }
-
- // and rebuild the global list to be ready for usage
- Set<String> s = new HashSet<String>();
- for (Set<String> attributes : eclIncludesByServer.values())
- {
- s.addAll(attributes);
- }
- eclIncludesAllServers = Collections.unmodifiableSet(s);
-
- s = new HashSet<String>();
- for (Set<String> attributes : eclIncludesForDeletesByServer.values())
- {
- s.addAll(attributes);
- }
- eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s);
+ current = this.eclIncludes.get();
+ updated = current.addIncludedAttributes(
+ serverId, includeAttributes, includeAttributesForDeletes);
}
-
- return configurationChanged;
+ while (!this.eclIncludes.compareAndSet(current, updated));
+ return current != updated;
}
@@ -3540,10 +3539,7 @@
*/
public Set<String> getEclIncludes()
{
- synchronized (eclIncludesLock)
- {
- return eclIncludesAllServers;
- }
+ return eclIncludes.get().includedAttrsAllServers;
}
@@ -3555,10 +3551,7 @@
*/
public Set<String> getEclIncludesForDeletes()
{
- synchronized (eclIncludesLock)
- {
- return eclIncludesForDeletesAllServers;
- }
+ return eclIncludes.get().includedAttrsForDeletesAllServers;
}
@@ -3573,10 +3566,7 @@
*/
Set<String> getEclIncludes(int serverId)
{
- synchronized (eclIncludesLock)
- {
- return eclIncludesByServer.get(serverId);
- }
+ return eclIncludes.get().includedAttrsByServer.get(serverId);
}
@@ -3591,10 +3581,7 @@
*/
Set<String> getEclIncludesForDeletes(int serverId)
{
- synchronized (eclIncludesLock)
- {
- return eclIncludesForDeletesByServer.get(serverId);
- }
+ return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
}
/**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
index 589f539..b3ca6b7 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -34,7 +34,7 @@
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.replication.service.ReplicationDomain.IEContext;
+import org.opends.server.replication.service.ReplicationDomain.*;
import org.opends.server.types.*;
/**
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index fee554b..207f48b 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -214,7 +214,7 @@
{
final ReplicationBroker broker = new ReplicationBroker(
new DummyReplicationDomain(generationId), new ServerState(),
- config, generationId, getReplSessionSecurity());
+ config, getReplSessionSecurity());
connect(broker, port, timeout);
return broker;
}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 8ba7ec4..9d56f00 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -37,6 +37,7 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.ModificationType;
import org.opends.server.TestCaseUtils;
+import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.ReplicationTestCase;
@@ -639,6 +640,7 @@
private static final String REPLICATION_GENERATION_ID =
"ds-sync-generation-id";
+ private static final Task NO_INIT_TASK = null;
private long readGenIdFromSuffixRootEntry(String rootDn) throws Exception
{
@@ -1059,7 +1061,7 @@
replicationDomain.initExport(exportLdif, 2);
// Perform full update from fake domain to fractional domain
- replicationDomain.initializeRemote(DS1_ID);
+ replicationDomain.initializeRemote(DS1_ID, NO_INIT_TASK);
/*
* Check fractional domain is operational and that filtering has been done
@@ -1294,10 +1296,10 @@
replicationDomain.initExport(exportLdif, 2);
// Perform full update from fake domain to fractional domain
- replicationDomain.initializeRemote(DS1_ID);
+ replicationDomain.initializeRemote(DS1_ID, NO_INIT_TASK);
/*
- * Chack fractional domain is operational and that filtering has been done
+ * Check fractional domain is operational and that filtering has been done
* during the full update
*/
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index 219958d..f952725 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -73,7 +73,7 @@
public TestBroker(List<ReplicationMsg> list)
{
super(new DummyReplicationDomain(0), null,
- new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), 0, null);
+ new DomainFakeCfg(null, 0, TestCaseUtils.<String> newSortedSet()), null);
this.list = list;
}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
index f44afdb..293f5da 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
-import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -228,7 +227,7 @@
fakeCfg.setChangetimeHeartbeatInterval(500);
ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true);
ReplicationBroker broker = new ReplicationBroker(
- new DummyReplicationDomain(generationId), state, fakeCfg, generationId, security);
+ new DummyReplicationDomain(generationId), state, fakeCfg, security);
broker.start();
checkConnection(30, broker, rs1Port);
return broker;
@@ -403,20 +402,13 @@
{
for (int count = 0; count< 50; count++)
{
- List<DSInfo> dsList = ds3.getDsList();
- DSInfo ds3Info = null;
- if (dsList.size() > 0)
- {
- ds3Info = dsList.get(0);
- }
- if (ds3Info != null
- && ds3Info.getDsId() == DS2_ID
- && ds3Info.getStatus() == ServerStatus.DEGRADED_STATUS)
+ DSInfo dsInfo = ds3.getReplicaInfos().get(DS2_ID);
+ if (dsInfo != null && dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS)
{
break;
}
- assertTrue(count < 50, "DS2 did not get degraded : " + ds3Info);
+ assertTrue(count < 50, "DS2 did not get degraded : " + dsInfo);
Thread.sleep(200); // Be sure status analyzer has time to test
}
}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index b14f113..70a7d3d 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -1020,10 +1020,11 @@
rd.getGroupId(), rd.getRefUrls(),
rd.getEclIncludes(), rd.getEclIncludesForDeletes(),
ProtocolVersion.getCurrentVersion());
- final List<DSInfo> dsList = new ArrayList<DSInfo>(rd.getReplicasList());
+ final List<DSInfo> dsList =
+ new ArrayList<DSInfo>(rd.getReplicaInfos().values());
dsList.add(dsInfo);
- TopoView dsTopoView = new TopoView(dsList, rd.getRsList());
+ TopoView dsTopoView = new TopoView(dsList, rd.getRsInfos());
assertEquals(dsTopoView, theoricalTopoView, " in DSid=" + currentDsId);
}
}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index 9dd824d..32712bc 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -32,10 +32,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.zip.DataFormatException;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.ldap.ModificationType;
+import org.assertj.core.api.Assertions;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperationBasis;
@@ -50,6 +52,7 @@
import org.testng.annotations.Test;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
@@ -1036,61 +1039,47 @@
assertEquals(bi.toString(16), pdu);
}
- @DataProvider(name="createTopologyData")
+ @DataProvider
public Object [][] createTopologyData() throws Exception
{
- List<String> urls1 = new ArrayList<String>();
- urls1.add("ldap://ldap.iplanet.com/o=test??sub?(sn=Jensen)");
- urls1.add("ldaps://ldap.iplanet.com:4041/uid=bjensen,ou=People,o=test?cn,mail,telephoneNumber");
+ List<String> urls1 = newList(
+ "ldap://ldap.iplanet.com/o=test??sub?(sn=Jensen)",
+ "ldaps://ldap.iplanet.com:4041/uid=bjensen,ou=People,o=test?cn,mail,telephoneNumber");
- List<String> urls2 = new ArrayList<String>();
+ List<String> urls2 = newList();
- List<String> urls3 = new ArrayList<String>();
- urls3.add("ldaps://host:port/dc=foo??sub?(sn=One Entry)");
+ List<String> urls3 = newList(
+ "ldaps://host:port/dc=foo??sub?(sn=One Entry)");
- List<String> urls4 = new ArrayList<String>();
- urls4.add("ldaps://host:port/dc=foobar1??sub?(sn=Another Entry 1)");
- urls4.add("ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)");
+ List<String> urls4 = newList(
+ "ldaps://host:port/dc=foobar1??sub?(sn=Another Entry 1)",
+ "ldaps://host:port/dc=foobar2??sub?(sn=Another Entry 2)");
- DSInfo dsInfo1 = new DSInfo(13, "dsHost1:111", 26, 154631, ServerStatus.FULL_UPDATE_STATUS,
+ DSInfo dsInfo1 = new DSInfo(13, "", 26, 154631, ServerStatus.FULL_UPDATE_STATUS,
false, AssuredMode.SAFE_DATA_MODE, (byte)12, (byte)132, urls1, new HashSet<String>(), new HashSet<String>(), (short)-1);
- DSInfo dsInfo2 = new DSInfo(-436, "dsHost2:222", 493, -227896, ServerStatus.DEGRADED_STATUS,
+ DSInfo dsInfo2 = new DSInfo(-436, "", 493, -227896, ServerStatus.DEGRADED_STATUS,
true, AssuredMode.SAFE_READ_MODE, (byte)-7, (byte)-265, urls2, new HashSet<String>(), new HashSet<String>(), (short)-1);
- DSInfo dsInfo3 = new DSInfo(2436, "dsHost3:333", 591, 0, ServerStatus.NORMAL_STATUS,
+ DSInfo dsInfo3 = new DSInfo(2436, "", 591, 0, ServerStatus.NORMAL_STATUS,
false, AssuredMode.SAFE_READ_MODE, (byte)17, (byte)0, urls3, new HashSet<String>(), new HashSet<String>(), (short)-1);
- DSInfo dsInfo4 = new DSInfo(415, "dsHost4:444", 146, 0, ServerStatus.BAD_GEN_ID_STATUS,
+ DSInfo dsInfo4 = new DSInfo(415, "", 146, 0, ServerStatus.BAD_GEN_ID_STATUS,
true, AssuredMode.SAFE_DATA_MODE, (byte)2, (byte)15, urls4, new HashSet<String>(), new HashSet<String>(), (short)-1);
- List<DSInfo> dsList1 = new ArrayList<DSInfo>();
- dsList1.add(dsInfo1);
-
- List<DSInfo> dsList2 = new ArrayList<DSInfo>();
-
- List<DSInfo> dsList3 = new ArrayList<DSInfo>();
- dsList3.add(dsInfo2);
-
- List<DSInfo> dsList4 = new ArrayList<DSInfo>();
- dsList4.add(dsInfo4);
- dsList4.add(dsInfo3);
- dsList4.add(dsInfo2);
- dsList4.add(dsInfo1);
+ Set<DSInfo> dsList1 = newSet(dsInfo1);
+ Set<DSInfo> dsList2 = newSet();
+ Set<DSInfo> dsList3 = newSet(dsInfo2);
+ Set<DSInfo> dsList4 = newSet(dsInfo4, dsInfo3, dsInfo2, dsInfo1);
RSInfo rsInfo1 = new RSInfo(4527, null, 45316, (byte)103, 1);
RSInfo rsInfo2 = new RSInfo(4527, null, 0, (byte)0, 1);
RSInfo rsInfo3 = new RSInfo(0, null, -21113, (byte)98, 1);
- List<RSInfo> rsList1 = new ArrayList<RSInfo>();
- rsList1.add(rsInfo1);
+ List<RSInfo> rsList1 = newList(rsInfo1);
+ List<RSInfo> rsList2 = newList(rsInfo1, rsInfo2, rsInfo3);
- List<RSInfo> rsList2 = new ArrayList<RSInfo>();
- rsList2.add(rsInfo1);
- rsList2.add(rsInfo2);
- rsList2.add(rsInfo3);
-
- return new Object [][] {
+ return new Object[][] {
{"1a01313300323600313534363331000300020c84026c6461703a2f2f6c6461702e697" +
"06c616e65742e636f6d2f6f3d746573743f3f7375623f28736e3d4a656e73656e2900" +
"6c646170733a2f2f6c6461702e69706c616e65742e636f6d3a343034312f7569643d6" +
@@ -1098,9 +1087,9 @@
"6c6570686f6e654e756d6265720001343532370034353331360067",dsList1, rsList1},
{"1a0003343532370034353331360067343532370030000030002d32313131330062", dsList2, rsList2},
{"1a012d34333600343933002d32323738393600020101f9f70001343532370034353331360067", dsList3, rsList1},
- {"1a012d34333600343933002d32323738393600020101f9f70000", dsList3, new ArrayList<RSInfo>()},
- {"1a0001343532370034353331360067", new ArrayList<DSInfo>(), rsList1},
- {"1a0000", new ArrayList<DSInfo>(), new ArrayList<RSInfo>()},
+ {"1a012d34333600343933002d32323738393600020101f9f70000", dsList3, newList()},
+ {"1a0001343532370034353331360067", newSet(), rsList1},
+ {"1a0000", newSet(), newList()},
{"1a0434313500313436003000040102020f026c646170733a2f2f686f73743a706f727" +
"42f64633d666f6f626172313f3f7375623f28736e3d416e6f7468657220456e747279" +
"203129006c646170733a2f2f686f73743a706f72742f64633d666f6f626172323f3f7" +
@@ -1117,15 +1106,21 @@
}
@Test(dataProvider = "createTopologyData")
- public void oldTopologyPDUs(String oldPdu, List<DSInfo> dsList, List<RSInfo> rsList)
+ public void oldTopologyPDUs(String oldPdu, Set<DSInfo> dsList, List<RSInfo> rsList)
throws Exception
{
TopologyMsg msg = new TopologyMsg(hexStringToByteArray(oldPdu),
ProtocolVersion.REPLICATION_PROTOCOL_V3);
- assertEquals(msg.getDsList(), dsList);
- assertEquals(msg.getRsList(), rsList);
- BigInteger bi = new BigInteger(msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3));
- assertEquals(bi.toString(16), oldPdu);
+ Assertions.assertThat(new HashSet<DSInfo>(msg.getReplicaInfos().values()))
+ .isEqualTo(dsList);
+ assertEquals(msg.getRsInfos(), rsList);
+ if (msg.getReplicaInfos().values().equals(dsList))
+ {
+ // Unfortunately this check does not work when the order of the
+ // replicaInfos collection is not exactly the same as the dsList
+ BigInteger bi = new BigInteger(msg.getBytes(ProtocolVersion.REPLICATION_PROTOCOL_V3));
+ assertEquals(bi.toString(16), oldPdu);
+ }
}
@DataProvider(name="createEntryMsgData")
@@ -1137,9 +1132,8 @@
int pos = 0;
int length = 2;
int msgid = 14;
- Object[] set1 = new Object[] {sid, dest, entryBytes, pos, length, msgid};
- return new Object [][] { set1};
+ return new Object[][] { { sid, dest, entryBytes, pos, length, msgid } };
}
/**
@@ -1186,8 +1180,7 @@
int sender = 1;
int dest = 2;
LocalizableMessage message = ERR_UNKNOWN_TYPE.get("toto");
- Object[] set1 = new Object[] {sender, dest, message};
- return new Object [][] { set1};
+ return new Object[][] { { sender, dest, message } };
}
/**
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 755a1a1..c748d51 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -916,8 +916,8 @@
TopologyMsg msg = new TopologyMsg(dsList, rsList);
TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()),
ProtocolVersion.getCurrentVersion());
- assertEquals(msg.getDsList(), newMsg.getDsList());
- assertEquals(msg.getRsList(), newMsg.getRsList());
+ assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos());
+ assertEquals(msg.getRsInfos(), newMsg.getRsInfos());
}
/**
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index eaed2d3..a9a8dc0 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -1725,14 +1725,14 @@
private void waitForStableTopo(FakeReplicationDomain fakeRd, int expectedDs,
int expectedRs) throws Exception
{
- List<DSInfo> dsInfo = null;
+ Map<Integer, DSInfo> dsInfo = null;
List<RSInfo> rsInfo = null;
long nSec = 0;
long startTime = System.currentTimeMillis();
do
{
- dsInfo = fakeRd.getReplicasList();
- rsInfo = fakeRd.getRsList();
+ dsInfo = fakeRd.getReplicaInfos();
+ rsInfo = fakeRd.getRsInfos();
if (dsInfo.size() == expectedDs && rsInfo.size() == expectedRs)
{
debugInfo("waitForStableTopo: expected topo obtained after " + nSec + " second(s).");
@@ -3123,8 +3123,7 @@
// DS must see expected numbers of DSs/RSs
final FakeReplicationDomain fakeRd1 = fakeRDs[1];
waitForStableTopo(fakeRd1, 1, 1);
- List<DSInfo> dsInfos = fakeRd1.getReplicasList();
- DSInfo dsInfo = dsInfos.get(0);
+ DSInfo dsInfo = fakeRd1.getReplicaInfos().get(FDS2_ID);
assertEquals(dsInfo.getDsId(), FDS2_ID);
assertEquals(dsInfo.getStatus(), ServerStatus.NORMAL_STATUS);
@@ -3144,27 +3143,7 @@
}
// Wait for DS2 being degraded
- boolean error = true;
- for (int count = 0; count < 12; count++)
- {
- dsInfos = fakeRd1.getReplicasList();
- if (dsInfos == null)
- continue;
- if (dsInfos.size() == 0)
- continue;
- dsInfo = dsInfos.get(0);
- if ( (dsInfo.getDsId() == FDS2_ID) &&
- (dsInfo.getStatus() == ServerStatus.DEGRADED_STATUS) )
- {
- error = false;
- break;
- }
- else
- {
- Thread.sleep(1000);
- }
- }
- assertFalse(error, "DS2 not in degraded status");
+ expectStatusForDS(fakeRd1, ServerStatus.DEGRADED_STATUS, FDS2_ID);
Thread.sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
assertEquals(fakeRd1.getAssuredSrSentUpdates(), 4);
@@ -3226,27 +3205,7 @@
fakeRd2.startListenService();
// Wait for DS2 being back to normal
- error = true;
- for (int count = 0; count < 12; count++)
- {
- dsInfos = fakeRd1.getReplicasList();
- if (dsInfos == null)
- continue;
- if (dsInfos.size() == 0)
- continue;
- dsInfo = dsInfos.get(0);
- if ( (dsInfo.getDsId() == FDS2_ID) &&
- (dsInfo.getStatus() == ServerStatus.NORMAL_STATUS) )
- {
- error = false;
- break;
- }
- else
- {
- Thread.sleep(1000);
- }
- }
- assertFalse(error, "DS2 not back to normal status");
+ expectStatusForDS(fakeRd1, ServerStatus.NORMAL_STATUS, FDS2_ID);
// DS2 should also change status so reset its assured monitoring data so no received sr updates
assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5);
@@ -3317,12 +3276,29 @@
assertEquals(fakeRd2.getReceivedUpdates(), 6);
assertEquals(fakeRd2.getWrongReceivedUpdates(), 1);
assertFalse(fakeRd2.receivedUpdatesOk());
- } finally
+ }
+ finally
{
endTest();
}
}
+ private void expectStatusForDS(final ReplicationDomain domain,
+ ServerStatus expectedStatus, int dsId) throws InterruptedException
+ {
+ for (int count = 0; count < 12; count++)
+ {
+ final DSInfo dsInfo = domain.getReplicaInfos().get(dsId);
+ if (dsInfo != null && dsInfo.getStatus() == expectedStatus)
+ {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ Assert.fail("DS(" + dsId + ") did not have expected status "
+ + expectedStatus + " after 12 seconds");
+ }
+
private void assertContainsOnly(Map<Integer, Integer> map, int key,
int expectedValue)
{
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 7d4498e..0c19525 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -359,7 +359,7 @@
final long generationId = getGenerationId(TEST_ROOT_DN);
broker = new ReplicationBroker(new DummyReplicationDomain(generationId),
state, newFakeCfg(TEST_ROOT_DN, 3, replicationServerPort),
- generationId, getReplSessionSecurity());
+ getReplSessionSecurity());
connect(broker, replicationServerPort, 5000);
ReplicationMsg receivedMsg = broker.receive();
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java
new file mode 100644
index 0000000..1eace5c
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationBrokerTest.java
@@ -0,0 +1,268 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.service;
+
+import java.util.*;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.service.ReplicationBroker.ReplicationServerInfo;
+import org.opends.server.replication.service.ReplicationBroker.Topology;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static java.util.Collections.*;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
+@SuppressWarnings("javadoc")
+public class ReplicationBrokerTest extends DirectoryServerTestCase
+{
+
+ private static enum TopologyCtorToUse
+ {
+ BUILD_WITH_TOPOLOGY_MSG, BUILD_WITH_DS_RS_LISTS;
+ }
+
+ private final int CURRENT_RS_ID = 91;
+ private final int MISSING_RS_ID = 93;
+ private final int ANOTHER_RS_ID = 94;
+ private final DSInfo CURRENT_DS = dsInfo(11, CURRENT_RS_ID);
+ private final DSInfo OTHER_DS = dsInfo(12, CURRENT_RS_ID);
+ private final DSInfo MISSING_DS = dsInfo(13, CURRENT_RS_ID);
+ private final ReplicationServerInfo CURRENT_RS = rsInfo(CURRENT_RS_ID,
+ CURRENT_DS.getDsId(), OTHER_DS.getDsId());
+ private final ReplicationServerInfo MISSING_RS = rsInfo(MISSING_RS_ID,
+ MISSING_DS.getDsId());
+ private final ReplicationServerInfo ANOTHER_RS = rsInfo(ANOTHER_RS_ID);
+
+ @SuppressWarnings("unchecked")
+ private DSInfo dsInfo(int dsServerId, int rsServerId)
+ {
+ byte z = 0;
+ return new DSInfo(dsServerId, null, rsServerId, 0, null,
+ false, null, z, z, EMPTY_LIST, EMPTY_LIST, EMPTY_LIST, z);
+ }
+
+ private ReplicationServerInfo rsInfo(int rsServerId, Integer... dsIds)
+ {
+ byte z = 0;
+ final RSInfo info = new RSInfo(rsServerId, rsServerId + ":1389", 0, z, 0);
+ return new ReplicationServerInfo(info, newSet(dsIds));
+ }
+
+ private Map<Integer, ReplicationServerInfo> newMap(ReplicationServerInfo... infos)
+ {
+ if (infos.length == 0)
+ {
+ return Collections.emptyMap();
+ }
+ final Map<Integer, ReplicationServerInfo> map =
+ new HashMap<Integer, ReplicationServerInfo>();
+ for (ReplicationServerInfo info : infos)
+ {
+ map.put(info.getServerId(), info);
+ }
+ return map;
+ }
+
+ private void assertInvariants(final Topology topo)
+ {
+ assertThat(topo.replicaInfos).doesNotContainKey(CURRENT_DS.getDsId());
+ }
+
+ private ReplicationServerInfo assertContainsRSWithDSs(
+ Map<Integer, ReplicationServerInfo> rsInfos,
+ ReplicationServerInfo rsInfo, Integer... connectedDSs)
+ {
+ return assertContainsRSWithDSs(rsInfos, rsInfo, newSet(connectedDSs));
+ }
+
+ private ReplicationServerInfo assertContainsRSWithDSs(
+ Map<Integer, ReplicationServerInfo> rsInfos,
+ ReplicationServerInfo rsInfo, Set<Integer> connectedDSs)
+ {
+ final ReplicationServerInfo info = find(rsInfos, rsInfo.toRSInfo());
+ assertNotNull(info);
+ assertThat(info.getConnectedDSs()).containsAll(connectedDSs);
+ return info;
+ }
+
+ private ReplicationServerInfo find(Map<Integer, ReplicationServerInfo> rsInfos, RSInfo rsInfo)
+ {
+ for (ReplicationServerInfo info : rsInfos.values())
+ {
+ if (info.getServerId() == rsInfo.getId())
+ {
+ return info;
+ }
+ }
+ return null;
+ }
+
+ private Topology newTopology(TopologyCtorToUse toUse,
+ Map<Integer, DSInfo> replicaInfos, List<RSInfo> rsInfos, int dsServerId, int rsServerId,
+ Set<String> rsUrls, Map<Integer, ReplicationServerInfo> previousRSs)
+ {
+ if (TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG == toUse)
+ {
+ final TopologyMsg topologyMsg = new TopologyMsg(replicaInfos.values(), rsInfos);
+ return new Topology(topologyMsg, dsServerId, rsServerId, rsUrls, previousRSs);
+ }
+ else if (TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS == toUse)
+ {
+ return new Topology(replicaInfos, rsInfos, dsServerId, rsServerId, rsUrls, previousRSs);
+ }
+ Assert.fail("Do not know which Topology constructor to use: " + toUse);
+ return null;
+ }
+
+ private Map<Integer, DSInfo> newMap(DSInfo... dsInfos)
+ {
+ final Map<Integer, DSInfo> results = new HashMap<Integer, DSInfo>();
+ for (DSInfo dsInfo : dsInfos)
+ {
+ results.put(dsInfo.getDsId(), dsInfo);
+ }
+ return results;
+ }
+
+ @DataProvider
+ public Object[][] topologyCtorProvider() {
+ return new Object[][] { { TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG },
+ { TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS } };
+ }
+
+ @Test(dataProvider = "topologyCtorProvider")
+ @SuppressWarnings("unchecked")
+ public void topologyShouldContainNothing(TopologyCtorToUse toUse)
+ throws Exception
+ {
+ final Topology topo = newTopology(toUse,
+ EMPTY_MAP, EMPTY_LIST,
+ CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
+ assertInvariants(topo);
+ assertThat(topo.rsInfos).isEmpty();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void topologyShouldFilterOutCurrentDS()
+ {
+ final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG,
+ newMap(OTHER_DS, CURRENT_DS), EMPTY_LIST,
+ CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
+ assertInvariants(topo);
+ assertThat(topo.rsInfos).isEmpty();
+ }
+
+ @Test(dataProvider = "topologyCtorProvider")
+ @SuppressWarnings("unchecked")
+ public void topologyShouldContainRSWithoutOtherDS(TopologyCtorToUse toUse)
+ {
+ final Topology topo = newTopology(toUse,
+ newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+ CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
+ assertInvariants(topo);
+ assertThat(topo.rsInfos).hasSize(1);
+ assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_DS.getDsId());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void topologyShouldContainRSWithAllDSs_buildWithTopologyMsg()
+ {
+ final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_TOPOLOGY_MSG,
+ newMap(CURRENT_DS, OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+ CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, EMPTY_MAP);
+ assertInvariants(topo);
+ assertThat(topo.rsInfos).hasSize(1);
+ assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
+ }
+
+ @Test(dataProvider = "topologyCtorProvider")
+ @SuppressWarnings("unchecked")
+ public void topologyShouldStillContainRS(TopologyCtorToUse toUse) throws Exception
+ {
+ final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS);
+ final Topology topo = newTopology(toUse,
+ newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+ CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
+ assertInvariants(topo);
+ assertThat(topo.rsInfos).hasSize(1);
+ assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
+ }
+
+ @Test(dataProvider = "topologyCtorProvider")
+ @SuppressWarnings("unchecked")
+ public void topologyShouldStillContainRSWithNewlyProvidedDSs(TopologyCtorToUse toUse)
+ {
+ final ReplicationServerInfo CURRENT_RS_WITHOUT_DS = rsInfo(CURRENT_RS_ID);
+ final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS_WITHOUT_DS);
+ final Topology topo = newTopology(toUse,
+ newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+ CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
+ assertInvariants(topo);
+ assertThat(topo.rsInfos).hasSize(1);
+ assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_DS.getDsId(), OTHER_DS.getDsId());
+ }
+
+ @Test(dataProvider = "topologyCtorProvider")
+ @SuppressWarnings("unchecked")
+ public void topologyShouldHaveRemovedMissingRS(TopologyCtorToUse toUse)
+ {
+ final Map<Integer, ReplicationServerInfo> previousRSs = newMap(CURRENT_RS, MISSING_RS);
+ final Topology topo = newTopology(toUse,
+ newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo()),
+ CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), EMPTY_SET, previousRSs);
+ assertInvariants(topo);
+ assertThat(topo.rsInfos).hasSize(1);
+ assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void topologyShouldHaveStampedLocallyConfiguredRSs_buildWithDsRsLists()
+ {
+ final Set<String> locallyConfigured = newSet(CURRENT_RS.getServerURL());
+ final Topology topo = newTopology(TopologyCtorToUse.BUILD_WITH_DS_RS_LISTS,
+ newMap(OTHER_DS), newList(CURRENT_RS.toRSInfo(), ANOTHER_RS.toRSInfo()),
+ CURRENT_DS.getDsId(), CURRENT_RS.getServerId(), locallyConfigured, EMPTY_MAP);
+ assertInvariants(topo);
+ assertThat(topo.rsInfos).hasSize(2);
+ ReplicationServerInfo currentRS =
+ assertContainsRSWithDSs(topo.rsInfos, CURRENT_RS, CURRENT_RS.getConnectedDSs());
+ ReplicationServerInfo anotherRS =
+ assertContainsRSWithDSs(topo.rsInfos, ANOTHER_RS);
+ assertThat(currentRS.isLocallyConfigured()).isTrue();
+ assertThat(anotherRS.isLocallyConfigured()).isFalse();
+ }
+
+}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 33da154..1f612fe 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -26,12 +26,16 @@
*/
package org.opends.server.replication.service;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opends.server.TestCaseUtils;
+import org.opends.server.backends.task.Task;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
@@ -42,6 +46,7 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationDomain.IEContext;
import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -54,6 +59,8 @@
@SuppressWarnings("javadoc")
public class ReplicationDomainTest extends ReplicationTestCase
{
+ private static final Task NO_INIT_TASK = null;
+
@DataProvider(name = "publishAndReceiveData")
public Object[][] createpublishAndReceiveData()
{
@@ -116,14 +123,14 @@
assertNotNull(rcvdMsg);
assertEquals(test, rcvdMsg.getPayload());
- for (RSInfo replServerInfo : domain1.getRsList())
+ for (RSInfo replServerInfo : domain1.getRsInfos())
{
// The generation Id of the remote should be 1
assertEquals(replServerInfo.getGenerationId(), 1,
"Unexpected value of generationId in RSInfo for RS=" + replServerInfo);
}
- for (DSInfo serverInfo : domain1.getReplicasList())
+ for (DSInfo serverInfo : domain1.getReplicaInfos().values())
{
assertEquals(serverInfo.getStatus(), ServerStatus.NORMAL_STATUS);
}
@@ -132,7 +139,7 @@
domain1.resetReplicationLog();
Thread.sleep(500);
- for (RSInfo replServerInfo : domain1.getRsList())
+ for (RSInfo replServerInfo : domain1.getRsInfos())
{
// The generation Id of the remote should now be 2
assertEquals(replServerInfo.getGenerationId(), 2,
@@ -144,9 +151,9 @@
{
try
{
- assertExpectedServerStatuses(domain1.getReplicasList(),
+ assertExpectedServerStatuses(domain1.getReplicaInfos(),
domain1ServerId, domain2ServerId);
- assertExpectedServerStatuses(domain2.getReplicasList(),
+ assertExpectedServerStatuses(domain2.getReplicaInfos(),
domain1ServerId, domain2ServerId);
Map<Integer, ServerState> states1 = domain1.getReplicaStates();
@@ -178,13 +185,15 @@
}
}
- private void assertExpectedServerStatuses(List<DSInfo> dsInfos,
+ private void assertExpectedServerStatuses(Map<Integer, DSInfo> dsInfos,
int domain1ServerId, int domain2ServerId)
{
- for (DSInfo serverInfo : dsInfos)
+ for (DSInfo serverInfo : dsInfos.values())
{
if (serverInfo.getDsId() == domain2ServerId)
+ {
assertEquals(serverInfo.getStatus(), ServerStatus.BAD_GEN_ID_STATUS);
+ }
else
{
assertEquals(serverInfo.getDsId(), domain1ServerId);
@@ -330,15 +339,7 @@
* Trigger a total update from domain1 to domain2.
* Check that the exported data is correctly received on domain2.
*/
- for (DSInfo remoteDS : domain2.getReplicasList())
- {
- if (remoteDS.getDsId() != domain2.getServerId())
- {
- domain2.initializeFromRemote(remoteDS.getDsId());
- break;
- }
- }
-
+ assertTrue(initializeFromRemote(domain2));
waitEndExport(exportedData, importedData);
assertExportSucessful(domain1, domain2, exportedData, importedData);
}
@@ -349,6 +350,19 @@
}
}
+ private boolean initializeFromRemote(ReplicationDomain domain) throws DirectoryException
+ {
+ for (DSInfo remoteDS : domain.getReplicaInfos().values())
+ {
+ if (remoteDS.getDsId() != domain.getServerId())
+ {
+ domain.initializeFromRemote(remoteDS.getDsId(), NO_INIT_TASK);
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Test that a ReplicationDomain is able to export and import its database
* across 2 replication servers.
@@ -387,7 +401,7 @@
domain2 = new FakeReplicationDomain(
testService, 2, servers2, 0, null, importedData, 0);
- domain2.initializeFromRemote(1);
+ domain2.initializeFromRemote(1, NO_INIT_TASK);
waitEndExport(exportedData, importedData);
assertExportSucessful(domain1, domain2, exportedData, importedData);
@@ -517,23 +531,10 @@
* Trigger a total update from domain1 to domain2.
* Check that the exported data is correctly received on domain2.
*/
- boolean alone = true;
- while (alone)
+ while (!initializeFromRemote(domain1))
{
- for (DSInfo remoteDS : domain1.getReplicasList())
- {
- if (remoteDS.getDsId() != domain1.getServerId())
- {
- alone = false;
- domain1.initializeFromRemote(remoteDS.getDsId() , null);
- break;
- }
- }
- if (alone)
- {
- System.out.println("trying...");
- Thread.sleep(1000);
- }
+ System.out.println("trying...");
+ Thread.sleep(1000);
}
System.out.println("waiting");
Thread.sleep(10000000);
--
Gitblit v1.10.0