| | |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.io.ByteArrayOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.*; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | |
| | | /** |
| | | * This class defines a message that is sent: |
| | | * - By a RS to the other RSs in the topology, containing: |
| | |
| | | * @throws java.util.zip.DataFormatException If the byte array does not |
| | | * contain a valid encoded form of the message. |
| | | */ |
| | | public TopologyMsg(byte[] in, short version) throws DataFormatException |
| | | TopologyMsg(byte[] in, short version) throws DataFormatException |
| | | { |
| | | try |
| | | final ByteArrayScanner scanner = new ByteArrayScanner(in); |
| | | final byte msgType = scanner.nextByte(); |
| | | if (msgType != MSG_TYPE_TOPOLOGY) |
| | | { |
| | | /* First byte is the type */ |
| | | if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY) |
| | | { |
| | | throw new DataFormatException( |
| | | "Input is not a valid " + getClass().getCanonicalName()); |
| | | } |
| | | |
| | | int pos = 1; |
| | | |
| | | /* Read number of following DS info entries */ |
| | | byte nDsInfo = in[pos++]; |
| | | |
| | | /* Read the DS info entries */ |
| | | Map<Integer, DSInfo> replicaInfos = |
| | | new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo)); |
| | | while (nDsInfo > 0 && pos < in.length) |
| | | { |
| | | /* Read DS id */ |
| | | int length = getNextLength(in, pos); |
| | | String serverIdString = new String(in, pos, length, "UTF-8"); |
| | | int dsId = Integer.valueOf(serverIdString); |
| | | pos += length + 1; |
| | | |
| | | /* Read DS URL */ |
| | | String dsUrl; |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6) |
| | | { |
| | | length = getNextLength(in, pos); |
| | | dsUrl = new String(in, pos, length, "UTF-8"); |
| | | pos += length + 1; |
| | | } |
| | | else |
| | | { |
| | | dsUrl = ""; |
| | | } |
| | | |
| | | /* Read RS id */ |
| | | length = getNextLength(in, pos); |
| | | serverIdString = new String(in, pos, length, "UTF-8"); |
| | | int rsId = Integer.valueOf(serverIdString); |
| | | pos += length + 1; |
| | | |
| | | /* Read the generation id */ |
| | | length = getNextLength(in, pos); |
| | | long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | |
| | | /* Read DS status */ |
| | | ServerStatus status = ServerStatus.valueOf(in[pos++]); |
| | | |
| | | /* Read DS assured flag */ |
| | | boolean assuredFlag = in[pos++] == 1; |
| | | |
| | | /* Read DS assured mode */ |
| | | AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]); |
| | | |
| | | /* Read DS safe data level */ |
| | | byte safeDataLevel = in[pos++]; |
| | | |
| | | /* Read DS group id */ |
| | | byte groupId = in[pos++]; |
| | | |
| | | /* Read number of referrals URLs */ |
| | | List<String> refUrls = new ArrayList<String>(); |
| | | pos = readStrings(in, pos, refUrls); |
| | | |
| | | Set<String> attrs = new HashSet<String>(); |
| | | Set<String> delattrs = new HashSet<String>(); |
| | | short protocolVersion = -1; |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | pos = readStrings(in, pos, attrs); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5) |
| | | { |
| | | pos = readStrings(in, pos, delattrs); |
| | | } |
| | | else |
| | | { |
| | | // Default to using the same set of attributes for deletes. |
| | | delattrs.addAll(attrs); |
| | | } |
| | | |
| | | /* Read Protocol version */ |
| | | protocolVersion = in[pos++]; |
| | | } |
| | | |
| | | /* Now create DSInfo and store it */ |
| | | replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId, |
| | | status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, |
| | | attrs, delattrs, protocolVersion)); |
| | | |
| | | nDsInfo--; |
| | | } |
| | | |
| | | /* Read number of following RS info entries */ |
| | | byte nRsInfo = in[pos++]; |
| | | |
| | | /* Read the RS info entries */ |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo)); |
| | | while (nRsInfo > 0 && pos < in.length) |
| | | { |
| | | /* Read RS id */ |
| | | int length = getNextLength(in, pos); |
| | | String serverIdString = new String(in, pos, length, "UTF-8"); |
| | | int id = Integer.valueOf(serverIdString); |
| | | pos += length + 1; |
| | | |
| | | /* Read the generation id */ |
| | | length = getNextLength(in, pos); |
| | | long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | |
| | | /* Read RS group id */ |
| | | byte groupId = in[pos++]; |
| | | |
| | | int weight = 1; |
| | | String serverUrl = null; |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | length = getNextLength(in, pos); |
| | | serverUrl = new String(in, pos, length, "UTF-8"); |
| | | pos += length + 1; |
| | | |
| | | /* Read RS weight */ |
| | | length = getNextLength(in, pos); |
| | | weight = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | } |
| | | |
| | | /* Now create RSInfo and store it */ |
| | | rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight)); |
| | | |
| | | nRsInfo--; |
| | | } |
| | | |
| | | this.replicaInfos = Collections.unmodifiableMap(replicaInfos); |
| | | this.rsInfos = Collections.unmodifiableList(rsInfos); |
| | | throw new DataFormatException("Input is not a valid " |
| | | + getClass().getCanonicalName()); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | |
| | | // Read the DS info entries, first read number of them |
| | | int nDsInfo = scanner.nextByte(); |
| | | final Map<Integer, DSInfo> replicaInfos = |
| | | new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo)); |
| | | while (nDsInfo > 0 && !scanner.isEmpty()) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | final DSInfo dsInfo = nextDSInfo(scanner, version); |
| | | replicaInfos.put(dsInfo.getDsId(), dsInfo); |
| | | nDsInfo--; |
| | | } |
| | | |
| | | // Read the RS info entries |
| | | int nRsInfo = scanner.nextByte(); |
| | | final List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo)); |
| | | while (nRsInfo > 0 && !scanner.isEmpty()) |
| | | { |
| | | rsInfos.add(nextRSInfo(scanner, version)); |
| | | nRsInfo--; |
| | | } |
| | | |
| | | this.replicaInfos = Collections.unmodifiableMap(replicaInfos); |
| | | this.rsInfos = Collections.unmodifiableList(rsInfos); |
| | | } |
| | | |
| | | private int readStrings(byte[] in, int pos, Collection<String> outputCol) |
| | | throws DataFormatException, UnsupportedEncodingException |
| | | private DSInfo nextDSInfo(ByteArrayScanner scanner, short version) |
| | | throws DataFormatException |
| | | { |
| | | byte nAttrs = in[pos++]; |
| | | byte nRead = 0; |
| | | // Read all elements until expected number read |
| | | while (nRead != nAttrs && pos < in.length) |
| | | final int dsId = scanner.nextIntUTF8(); |
| | | final String dsUrl = |
| | | version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString(); |
| | | final int rsId = scanner.nextIntUTF8(); |
| | | final long generationId = scanner.nextLongUTF8(); |
| | | final ServerStatus status = ServerStatus.valueOf(scanner.nextByte()); |
| | | final boolean assuredFlag = scanner.nextBoolean(); |
| | | final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte()); |
| | | final byte safeDataLevel = scanner.nextByte(); |
| | | final byte groupId = scanner.nextByte(); |
| | | |
| | | final List<String> refUrls = new ArrayList<String>(); |
| | | scanner.nextStrings(refUrls); |
| | | |
| | | final Set<String> attrs = new HashSet<String>(); |
| | | final Set<String> delattrs = new HashSet<String>(); |
| | | short protocolVersion = -1; |
| | | if (version >= REPLICATION_PROTOCOL_V4) |
| | | { |
| | | int length = getNextLength(in, pos); |
| | | outputCol.add(new String(in, pos, length, "UTF-8")); |
| | | pos += length + 1; |
| | | nRead++; |
| | | scanner.nextStrings(attrs); |
| | | |
| | | if (version >= REPLICATION_PROTOCOL_V5) |
| | | { |
| | | scanner.nextStrings(delattrs); |
| | | } |
| | | else |
| | | { |
| | | // Default to using the same set of attributes for deletes. |
| | | delattrs.addAll(attrs); |
| | | } |
| | | |
| | | protocolVersion = scanner.nextByte(); |
| | | } |
| | | return pos; |
| | | |
| | | return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag, |
| | | assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs, |
| | | protocolVersion); |
| | | } |
| | | |
| | | private RSInfo nextRSInfo(ByteArrayScanner scanner, short version) |
| | | throws DataFormatException |
| | | { |
| | | final int rsId = scanner.nextIntUTF8(); |
| | | final long generationId = scanner.nextLongUTF8(); |
| | | final byte groupId = scanner.nextByte(); |
| | | |
| | | int weight = 1; |
| | | String serverUrl = null; |
| | | if (version >= REPLICATION_PROTOCOL_V4) |
| | | { |
| | | serverUrl = scanner.nextString(); |
| | | weight = scanner.nextIntUTF8(); |
| | | } |
| | | |
| | | return new RSInfo(rsId, serverUrl, generationId, groupId, weight); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new message of the currently connected servers. |
| | | * Creates a new message of the currently connected servers. |
| | | * |
| | | * @param dsInfos The collection of currently connected DS servers ID. |
| | | * @param rsInfos The list of currently connected RS servers ID. |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public byte[] getBytes(short version) throws UnsupportedEncodingException |
| | | public byte[] getBytes(short version) |
| | | { |
| | | try |
| | | /** |
| | | * Message has the following form: |
| | | * <pdu type><number of following DSInfo entries>[<DSInfo>]* |
| | | * <number of following RSInfo entries>[<RSInfo>]* |
| | | */ |
| | | final ByteArrayBuilder builder = new ByteArrayBuilder(); |
| | | builder.append(MSG_TYPE_TOPOLOGY); |
| | | |
| | | // Put DS infos |
| | | builder.append((byte) replicaInfos.size()); |
| | | for (DSInfo dsInfo : replicaInfos.values()) |
| | | { |
| | | /** |
| | | * Message has the following form: |
| | | * <pdu type><number of following DSInfo entries>[<DSInfo>]* |
| | | * <number of following RSInfo entries>[<RSInfo>]* |
| | | */ |
| | | ByteArrayOutputStream oStream = new ByteArrayOutputStream(); |
| | | |
| | | /* Put the message type */ |
| | | oStream.write(MSG_TYPE_TOPOLOGY); |
| | | |
| | | // Put number of following DS info entries |
| | | oStream.write((byte) replicaInfos.size()); |
| | | |
| | | // Put DS info |
| | | for (DSInfo dsInfo : replicaInfos.values()) |
| | | builder.appendUTF8(dsInfo.getDsId()); |
| | | if (version >= REPLICATION_PROTOCOL_V6) |
| | | { |
| | | // Put DS id |
| | | byte[] byteServerId = |
| | | String.valueOf(dsInfo.getDsId()).getBytes("UTF-8"); |
| | | oStream.write(byteServerId); |
| | | oStream.write(0); |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6) |
| | | { |
| | | // Put DS URL |
| | | oStream.write(dsInfo.getDsUrl().getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | } |
| | | // Put RS id |
| | | byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8"); |
| | | oStream.write(byteServerId); |
| | | oStream.write(0); |
| | | // Put the generation id |
| | | oStream.write(String.valueOf(dsInfo.getGenerationId()). |
| | | getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | // Put DS status |
| | | oStream.write(dsInfo.getStatus().getValue()); |
| | | // Put DS assured flag |
| | | oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0); |
| | | // Put DS assured mode |
| | | oStream.write(dsInfo.getAssuredMode().getValue()); |
| | | // Put DS safe data level |
| | | oStream.write(dsInfo.getSafeDataLevel()); |
| | | // Put DS group id |
| | | oStream.write(dsInfo.getGroupId()); |
| | | |
| | | writeStrings(oStream, dsInfo.getRefUrls()); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // Put ECL includes |
| | | writeStrings(oStream, dsInfo.getEclIncludes()); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5) |
| | | { |
| | | writeStrings(oStream, dsInfo.getEclIncludesForDeletes()); |
| | | } |
| | | |
| | | oStream.write(dsInfo.getProtocolVersion()); |
| | | } |
| | | builder.append(dsInfo.getDsUrl()); |
| | | } |
| | | builder.appendUTF8(dsInfo.getRsId()); |
| | | builder.appendUTF8(dsInfo.getGenerationId()); |
| | | builder.append(dsInfo.getStatus().getValue()); |
| | | builder.append(dsInfo.isAssured()); |
| | | builder.append(dsInfo.getAssuredMode().getValue()); |
| | | builder.append(dsInfo.getSafeDataLevel()); |
| | | builder.append(dsInfo.getGroupId()); |
| | | |
| | | // Put number of following RS info entries |
| | | oStream.write((byte) rsInfos.size()); |
| | | builder.appendStrings(dsInfo.getRefUrls()); |
| | | |
| | | // Put RS info |
| | | for (RSInfo rsInfo : rsInfos) |
| | | if (version >= REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // Put RS id |
| | | byte[] byteServerId = |
| | | String.valueOf(rsInfo.getId()).getBytes("UTF-8"); |
| | | oStream.write(byteServerId); |
| | | oStream.write(0); |
| | | // Put the generation id |
| | | oStream.write(String.valueOf(rsInfo.getGenerationId()). |
| | | getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | // Put RS group id |
| | | oStream.write(rsInfo.getGroupId()); |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | builder.appendStrings(dsInfo.getEclIncludes()); |
| | | if (version >= REPLICATION_PROTOCOL_V5) |
| | | { |
| | | // Put server URL |
| | | oStream.write(rsInfo.getServerUrl().getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | |
| | | // Put RS weight |
| | | oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | builder.appendStrings(dsInfo.getEclIncludesForDeletes()); |
| | | } |
| | | builder.append((byte) dsInfo.getProtocolVersion()); |
| | | } |
| | | } |
| | | |
| | | return oStream.toByteArray(); |
| | | } |
| | | catch (IOException e) |
| | | // Put RS infos |
| | | builder.append((byte) rsInfos.size()); |
| | | for (RSInfo rsInfo : rsInfos) |
| | | { |
| | | // never happens |
| | | throw new RuntimeException(e); |
| | | } |
| | | } |
| | | builder.appendUTF8(rsInfo.getId()); |
| | | builder.appendUTF8(rsInfo.getGenerationId()); |
| | | builder.append(rsInfo.getGroupId()); |
| | | |
| | | private void writeStrings(ByteArrayOutputStream oStream, |
| | | Collection<String> col) throws IOException, UnsupportedEncodingException |
| | | { |
| | | // Put collection length as a byte |
| | | oStream.write(col.size()); |
| | | for (String elem : col) |
| | | { |
| | | // Write the element and a 0 terminating byte |
| | | oStream.write(elem.getBytes("UTF-8")); |
| | | oStream.write(0); |
| | | if (version >= REPLICATION_PROTOCOL_V4) |
| | | { |
| | | builder.append(rsInfo.getServerUrl()); |
| | | builder.appendUTF8(rsInfo.getWeight()); |
| | | } |
| | | } |
| | | |
| | | return builder.toByteArray(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | + "CONNECTED RS SERVERS:" |
| | | + "\n--------------------\n" |
| | | + rsStr |
| | | + (rsStr.equals("") ? "----------------------------\n" : ""); |
| | | + ("".equals(rsStr) ? "----------------------------\n" : ""); |
| | | } |
| | | |
| | | /** |