| | |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.CSN; |
| | | |
| | | /** |
| | | * Abstract class that must be extended to define a message |
| | |
| | | protected short protocolVersion; |
| | | |
| | | /** |
| | | * The ChangeNumber of this update. |
| | | * The CSN of this update. |
| | | */ |
| | | protected ChangeNumber changeNumber; |
| | | protected CSN csn; |
| | | |
| | | /** |
| | | * True when the update must use assured replication. |
| | |
| | | // Decode header |
| | | int pos = decodeHeader(MSG_TYPE_GENERIC_UPDATE, bytes); |
| | | |
| | | /* Read the payload : all the remaining bytes but the terminating 0 */ |
| | | // Read the payload : all the remaining bytes but the terminating 0 |
| | | int length = bytes.length - pos; |
| | | payload = new byte[length]; |
| | | try |
| | |
| | | * <p> |
| | | * This constructor is only used for testing. |
| | | * |
| | | * @param changeNumber The ChangeNumber associated with the change |
| | | * encoded in this message. |
| | | * @param csn The CSN associated with the change encoded in this message. |
| | | * @param payload The payload that must be encoded in this message. |
| | | */ |
| | | public UpdateMsg(ChangeNumber changeNumber, byte[] payload) |
| | | public UpdateMsg(CSN csn, byte[] payload) |
| | | { |
| | | this.payload = payload; |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.changeNumber = changeNumber; |
| | | this.csn = csn; |
| | | } |
| | | |
| | | /** |
| | | * Get the ChangeNumber from the message. |
| | | * @return the ChangeNumber |
| | | * Get the CSN from the message. |
| | | * @return the CSN |
| | | */ |
| | | public ChangeNumber getChangeNumber() |
| | | public CSN getCSN() |
| | | { |
| | | return changeNumber; |
| | | return csn; |
| | | } |
| | | |
| | | /** |
| | |
| | | public boolean equals(Object obj) |
| | | { |
| | | return obj != null && obj.getClass() == this.getClass() && |
| | | changeNumber.equals(((UpdateMsg) obj).changeNumber); |
| | | csn.equals(((UpdateMsg) obj).csn); |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | return changeNumber.hashCode(); |
| | | return csn.hashCode(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public int compareTo(UpdateMsg msg) |
| | | { |
| | | return changeNumber.compareTo(msg.getChangeNumber()); |
| | | return csn.compareTo(msg.getCSN()); |
| | | } |
| | | |
| | | /** |
| | |
| | | protected byte[] encodeHeader(byte type, int additionalLength, short version) |
| | | throws UnsupportedEncodingException |
| | | { |
| | | byte[] changeNumberByte = |
| | | this.getChangeNumber().toString().getBytes("UTF-8"); |
| | | byte[] csnByte = getCSN().toString().getBytes("UTF-8"); |
| | | |
| | | /* The message header is stored in the form : |
| | | * <operation type><protocol version><changenumber><assured> |
| | | * <operation type><protocol version><CSN><assured> |
| | | * <assured mode> <safe data level> |
| | | * the length of result byte array is therefore : |
| | | * 1 + 1 + change number length + 1 + 1 |
| | | * 1 + 1 + CSN length + 1 + 1 |
| | | * + 1 + 1 + additional_length |
| | | */ |
| | | int length = 6 + changeNumberByte.length + additionalLength; |
| | | int length = 6 + csnByte.length + additionalLength; |
| | | |
| | | byte[] encodedMsg = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | // put the type of the operation |
| | | encodedMsg[0] = type; |
| | | |
| | | /* put the protocol version */ |
| | | // put the protocol version |
| | | encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion(); |
| | | int pos = 2; |
| | | |
| | | /* Put the ChangeNumber */ |
| | | pos = addByteArray(changeNumberByte, encodedMsg, pos); |
| | | // Put the CSN |
| | | pos = addByteArray(csnByte, encodedMsg, pos); |
| | | |
| | | /* Put the assured flag */ |
| | | // Put the assured flag |
| | | encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0); |
| | | |
| | | /* Put the assured mode */ |
| | | // Put the assured mode |
| | | encodedMsg[pos++] = assuredMode.getValue(); |
| | | |
| | | /* Put the safe data level */ |
| | | // Put the safe data level |
| | | encodedMsg[pos++] = safeDataLevel; |
| | | |
| | | return encodedMsg; |
| | |
| | | throws DataFormatException |
| | | { |
| | | /* The message header is stored in the form : |
| | | * <operation type><protocol version><changenumber><assured> |
| | | * <operation type><protocol version><CSN><assured> |
| | | * <assured mode> <safe data level> |
| | | */ |
| | | if (!(type == encodedMsg[0])) |
| | | throw new DataFormatException("byte[] is not a valid update msg: " |
| | | + encodedMsg[0]); |
| | | |
| | | /* read the protocol version */ |
| | | protocolVersion = (short)encodedMsg[1]; |
| | | // read the protocol version |
| | | protocolVersion = encodedMsg[1]; |
| | | |
| | | try |
| | | { |
| | | /* Read the changeNumber */ |
| | | // Read the CSN |
| | | int pos = 2; |
| | | int length = getNextLength(encodedMsg, pos); |
| | | String changenumberStr = new String(encodedMsg, pos, length, "UTF-8"); |
| | | String csnStr = new String(encodedMsg, pos, length, "UTF-8"); |
| | | pos += length + 1; |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | csn = new CSN(csnStr); |
| | | |
| | | /* Read the assured information */ |
| | | // Read the assured information |
| | | assuredFlag = encodedMsg[pos++] == 1; |
| | | |
| | | /* Read the assured mode */ |
| | | // Read the assured mode |
| | | assuredMode = AssuredMode.valueOf(encodedMsg[pos++]); |
| | | |
| | | /* Read the safe data level */ |
| | | // Read the safe data level |
| | | safeDataLevel = encodedMsg[pos++]; |
| | | |
| | | return pos; |
| | |
| | | public byte[] getBytes(short protocolVersion) |
| | | throws UnsupportedEncodingException |
| | | { |
| | | /* Encode the header in a byte[] large enough to also contain the payload */ |
| | | // Encode the header in a byte[] large enough to also contain the payload |
| | | byte[] resultByteArray = encodeHeader(MSG_TYPE_GENERIC_UPDATE, |
| | | payload.length, ProtocolVersion.getCurrentVersion()); |
| | | |
| | | int pos = resultByteArray.length - payload.length; |
| | | |
| | | /* Add the payload */ |
| | | // Add the payload |
| | | for (int i = 0; i < payload.length; i++, pos++) |
| | | { |
| | | resultByteArray[pos] = payload[i]; |