| | |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | |
| | | /** |
| | | * |
| | | * This class defines a message that is sent: |
| | | * - By a RS to the other RSs in the topology, containing: |
| | | * - the list of DSs directly connected to the RS in the DS list |
| | | * - only this RS in the RS list |
| | | * - the DSs directly connected to the RS in the DS infos |
| | | * - only this RS in the RS infos |
| | | * - By a RS to his connected DSs, containing every DSs and RSs he knows. |
| | | * In that case the message contains: |
| | | * - the list of every DS the RS knows except the destinator DS in the DS list |
| | | * - the list of every connected RSs (including the sending RS) in the RS list |
| | | * - every DSs the RS knows except the destinator DS in the DS infos |
| | | * - every connected RSs (including the sending RS) in the RS infos |
| | | * |
| | | * Exchanging these messages allows to have each RS or DS take |
| | | * appropriate decisions according to the current topology: |
| | |
| | | */ |
| | | public class TopologyMsg extends ReplicationMsg |
| | | { |
| | | // Information for the DS known in the topology |
| | | private final List<DSInfo> dsList; |
| | | // Information for the RS known in the topology |
| | | private final List<RSInfo> rsList; |
| | | /** Information for the DSs (aka replicas) known in the topology. */ |
| | | private final Map<Integer, DSInfo> replicaInfos; |
| | | /** Information for the RSs known in the topology. */ |
| | | private final List<RSInfo> rsInfos; |
| | | |
| | | /** |
| | | * Creates a new changelogInfo message from its encoded form. |
| | |
| | | if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY) |
| | | { |
| | | throw new DataFormatException( |
| | | "Input is not a valid " + this.getClass().getCanonicalName()); |
| | | "Input is not a valid " + getClass().getCanonicalName()); |
| | | } |
| | | |
| | | int pos = 1; |
| | | |
| | | /* Read number of following DS info entries */ |
| | | |
| | | byte nDsInfo = in[pos++]; |
| | | |
| | | /* Read the DS info entries */ |
| | | List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo)); |
| | | while ( (nDsInfo > 0) && (pos < in.length) ) |
| | | Map<Integer, DSInfo> replicaInfos = |
| | | new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo)); |
| | | while (nDsInfo > 0 && pos < in.length) |
| | | { |
| | | /* Read DS id */ |
| | | int length = getNextLength(in, pos); |
| | |
| | | } |
| | | |
| | | /* Read RS id */ |
| | | length = |
| | | getNextLength(in, pos); |
| | | serverIdString = |
| | | new String(in, pos, length, "UTF-8"); |
| | | length = getNextLength(in, pos); |
| | | serverIdString = new String(in, pos, length, "UTF-8"); |
| | | int rsId = Integer.valueOf(serverIdString); |
| | | pos += length + 1; |
| | | |
| | | /* Read the generation id */ |
| | | length = getNextLength(in, pos); |
| | | long generationId = |
| | | Long.valueOf(new String(in, pos, length, |
| | | "UTF-8")); |
| | | long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | |
| | | /* Read DS status */ |
| | | ServerStatus status = ServerStatus.valueOf(in[pos++]); |
| | | |
| | | /* Read DS assured flag */ |
| | | boolean assuredFlag; |
| | | assuredFlag = in[pos++] == 1; |
| | | boolean assuredFlag = in[pos++] == 1; |
| | | |
| | | /* Read DS assured mode */ |
| | | AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]); |
| | |
| | | |
| | | /* Read number of referrals URLs */ |
| | | List<String> refUrls = new ArrayList<String>(); |
| | | byte nUrls = in[pos++]; |
| | | byte nRead = 0; |
| | | /* Read urls until expected number read */ |
| | | while ((nRead != nUrls) && |
| | | (pos < in.length) //security |
| | | ) |
| | | { |
| | | length = getNextLength(in, pos); |
| | | String url = new String(in, pos, length, "UTF-8"); |
| | | refUrls.add(url); |
| | | pos += length + 1; |
| | | nRead++; |
| | | } |
| | | pos = readStrings(in, pos, refUrls); |
| | | |
| | | Set<String> attrs = new HashSet<String>(); |
| | | Set<String> delattrs = new HashSet<String>(); |
| | | short protocolVersion = -1; |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | byte nAttrs = in[pos++]; |
| | | nRead = 0; |
| | | /* Read attrs until expected number read */ |
| | | while ((nRead != nAttrs) && (pos < in.length)) |
| | | { |
| | | length = getNextLength(in, pos); |
| | | String attr = new String(in, pos, length, "UTF-8"); |
| | | attrs.add(attr); |
| | | pos += length + 1; |
| | | nRead++; |
| | | } |
| | | pos = readStrings(in, pos, attrs); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5) |
| | | { |
| | | nAttrs = in[pos++]; |
| | | nRead = 0; |
| | | /* Read attrs until expected number read */ |
| | | while ((nRead != nAttrs) && (pos < in.length)) |
| | | { |
| | | length = getNextLength(in, pos); |
| | | String attr = new String(in, pos, length, "UTF-8"); |
| | | delattrs.add(attr); |
| | | pos += length + 1; |
| | | nRead++; |
| | | } |
| | | pos = readStrings(in, pos, delattrs); |
| | | } |
| | | else |
| | | { |
| | |
| | | } |
| | | |
| | | /* Read Protocol version */ |
| | | protocolVersion = (short)in[pos++]; |
| | | protocolVersion = in[pos++]; |
| | | } |
| | | |
| | | /* Now create DSInfo and store it in list */ |
| | | |
| | | DSInfo dsInfo = new DSInfo(dsId, dsUrl, rsId, generationId, status, |
| | | assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs, |
| | | delattrs, protocolVersion); |
| | | dsList.add(dsInfo); |
| | | /* Now create DSInfo and store it */ |
| | | replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId, |
| | | status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, |
| | | attrs, delattrs, protocolVersion)); |
| | | |
| | | nDsInfo--; |
| | | } |
| | | |
| | | /* Read number of following RS info entries */ |
| | | |
| | | byte nRsInfo = in[pos++]; |
| | | |
| | | /* Read the RS info entries */ |
| | | List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo)); |
| | | while ( (nRsInfo > 0) && (pos < in.length) ) |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo)); |
| | | while (nRsInfo > 0 && pos < in.length) |
| | | { |
| | | /* Read RS id */ |
| | | int length = getNextLength(in, pos); |
| | |
| | | |
| | | /* Read the generation id */ |
| | | length = getNextLength(in, pos); |
| | | long generationId = |
| | | Long.valueOf(new String(in, pos, length, |
| | | "UTF-8")); |
| | | long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | |
| | | /* Read RS group id */ |
| | |
| | | pos += length + 1; |
| | | } |
| | | |
| | | /* Now create RSInfo and store it in list */ |
| | | |
| | | RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId, |
| | | weight); |
| | | rsList.add(rsInfo); |
| | | /* Now create RSInfo and store it */ |
| | | rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight)); |
| | | |
| | | nRsInfo--; |
| | | } |
| | | |
| | | this.dsList = Collections.unmodifiableList(dsList); |
| | | this.rsList = Collections.unmodifiableList(rsList); |
| | | } catch (UnsupportedEncodingException e) |
| | | this.replicaInfos = Collections.unmodifiableMap(replicaInfos); |
| | | this.rsInfos = Collections.unmodifiableList(rsInfos); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new message from a list of the currently connected servers. |
| | | * |
| | | * @param dsList The list of currently connected DS servers ID. |
| | | * @param rsList The list of currently connected RS servers ID. |
| | | */ |
| | | public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList) |
| | | private int readStrings(byte[] in, int pos, Collection<String> outputCol) |
| | | throws DataFormatException, UnsupportedEncodingException |
| | | { |
| | | if (dsList == null || dsList.isEmpty()) |
| | | byte nAttrs = in[pos++]; |
| | | byte nRead = 0; |
| | | // Read all elements until expected number read |
| | | while (nRead != nAttrs && pos < in.length) |
| | | { |
| | | this.dsList = Collections.emptyList(); |
| | | int length = getNextLength(in, pos); |
| | | outputCol.add(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | nRead++; |
| | | } |
| | | return pos; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new message of the currently connected servers. |
| | | * |
| | | * @param dsInfos The collection of currently connected DS servers ID. |
| | | * @param rsInfos The list of currently connected RS servers ID. |
| | | */ |
| | | public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos) |
| | | { |
| | | if (dsInfos == null || dsInfos.isEmpty()) |
| | | { |
| | | this.replicaInfos = Collections.emptyMap(); |
| | | } |
| | | else |
| | | { |
| | | this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList)); |
| | | Map<Integer, DSInfo> replicas = new HashMap<Integer, DSInfo>(); |
| | | for (DSInfo dsInfo : dsInfos) |
| | | { |
| | | replicas.put(dsInfo.getDsId(), dsInfo); |
| | | } |
| | | this.replicaInfos = Collections.unmodifiableMap(replicas); |
| | | } |
| | | |
| | | if (rsList == null || rsList.isEmpty()) |
| | | if (rsInfos == null || rsInfos.isEmpty()) |
| | | { |
| | | this.rsList = Collections.emptyList(); |
| | | this.rsInfos = Collections.emptyList(); |
| | | } |
| | | else |
| | | { |
| | | this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList)); |
| | | this.rsInfos = |
| | | Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos)); |
| | | } |
| | | } |
| | | |
| | |
| | | // Msg encoding |
| | | // ============ |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public byte[] getBytes(short version) |
| | | throws UnsupportedEncodingException |
| | | public byte[] getBytes(short version) throws UnsupportedEncodingException |
| | | { |
| | | try |
| | | { |
| | |
| | | oStream.write(MSG_TYPE_TOPOLOGY); |
| | | |
| | | // Put number of following DS info entries |
| | | oStream.write((byte)dsList.size()); |
| | | oStream.write((byte) replicaInfos.size()); |
| | | |
| | | // Put DS info |
| | | for (DSInfo dsInfo : dsList) |
| | | for (DSInfo dsInfo : replicaInfos.values()) |
| | | { |
| | | // Put DS id |
| | | byte[] byteServerId = |
| | |
| | | oStream.write(0); |
| | | } |
| | | // Put RS id |
| | | byteServerId = |
| | | String.valueOf(dsInfo.getRsId()).getBytes("UTF-8"); |
| | | byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8"); |
| | | oStream.write(byteServerId); |
| | | oStream.write(0); |
| | | // Put the generation id |
| | |
| | | // Put DS group id |
| | | oStream.write(dsInfo.getGroupId()); |
| | | |
| | | List<String> refUrls = dsInfo.getRefUrls(); |
| | | // Put number of following URLs as a byte |
| | | oStream.write(refUrls.size()); |
| | | for (String url : refUrls) |
| | | { |
| | | // Write the url and a 0 terminating byte |
| | | oStream.write(url.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | writeStrings(oStream, dsInfo.getRefUrls()); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // Put ECL includes |
| | | Set<String> attrs = dsInfo.getEclIncludes(); |
| | | oStream.write(attrs.size()); |
| | | for (String attr : attrs) |
| | | { |
| | | oStream.write(attr.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | writeStrings(oStream, dsInfo.getEclIncludes()); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5) |
| | | { |
| | | Set<String> delattrs = dsInfo.getEclIncludesForDeletes(); |
| | | oStream.write(delattrs.size()); |
| | | for (String attr : delattrs) |
| | | { |
| | | oStream.write(attr.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | writeStrings(oStream, dsInfo.getEclIncludesForDeletes()); |
| | | } |
| | | |
| | | oStream.write(dsInfo.getProtocolVersion()); |
| | |
| | | } |
| | | |
| | | // Put number of following RS info entries |
| | | oStream.write((byte)rsList.size()); |
| | | oStream.write((byte) rsInfos.size()); |
| | | |
| | | // Put RS info |
| | | for (RSInfo rsInfo : rsList) |
| | | for (RSInfo rsInfo : rsInfos) |
| | | { |
| | | // Put RS id |
| | | byte[] byteServerId = |
| | |
| | | // never happens |
| | | throw new RuntimeException(e); |
| | | } |
| | | |
| | | } |
| | | |
| | | private void writeStrings(ByteArrayOutputStream oStream, |
| | | Collection<String> col) throws IOException, UnsupportedEncodingException |
| | | { |
| | | // Put collection length as a byte |
| | | oStream.write(col.size()); |
| | | for (String elem : col) |
| | | { |
| | | // Write the element and a 0 terminating byte |
| | | oStream.write(elem.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String dsStr = ""; |
| | | for (DSInfo dsInfo : dsList) |
| | | for (DSInfo dsInfo : replicaInfos.values()) |
| | | { |
| | | dsStr += dsInfo.toString() + "\n----------------------------\n"; |
| | | dsStr += dsInfo + "\n----------------------------\n"; |
| | | } |
| | | |
| | | String rsStr = ""; |
| | | for (RSInfo rsInfo : rsList) |
| | | for (RSInfo rsInfo : rsInfos) |
| | | { |
| | | rsStr += rsInfo.toString() + "\n----------------------------\n"; |
| | | rsStr += rsInfo + "\n----------------------------\n"; |
| | | } |
| | | |
| | | return ("TopologyMsg content: " |
| | | return "TopologyMsg content:" |
| | | + "\n----------------------------" |
| | | + "\nCONNECTED DS SERVERS:" |
| | | + "\n--------------------\n" |
| | | + dsStr |
| | | + "CONNECTED RS SERVERS:" |
| | | + "\n--------------------\n" |
| | | + rsStr + (rsStr.equals("") ? "----------------------------\n" : "")); |
| | | + rsStr |
| | | + (rsStr.equals("") ? "----------------------------\n" : ""); |
| | | } |
| | | |
| | | /** |
| | | * Get the list of DS info. |
| | | * @return The list of DS info |
| | | * Get the DS infos. |
| | | * |
| | | * @return The DS infos |
| | | */ |
| | | public List<DSInfo> getDsList() |
| | | public Map<Integer, DSInfo> getReplicaInfos() |
| | | { |
| | | return dsList; |
| | | return replicaInfos; |
| | | } |
| | | |
| | | /** |
| | | * Get the list of RS info. |
| | | * @return The list of RS info |
| | | * Get the RS infos. |
| | | * |
| | | * @return The RS infos |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | public List<RSInfo> getRsInfos() |
| | | { |
| | | return rsList; |
| | | return rsInfos; |
| | | } |
| | | } |