| | |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.protocols.asn1.ASN1; |
| | | import org.opends.server.protocols.asn1.ASN1Reader; |
| | | import org.opends.server.protocols.asn1.ASN1Writer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | |
| | | /** |
| | | * This message is part of the replication protocol. |
| | |
| | | * first missing change for each LDAP server connected to a Replication |
| | | * Server. |
| | | */ |
| | | static class ServerData |
| | | private static class ServerData |
| | | { |
| | | private ServerState state; |
| | | private long approxFirstMissingDate; |
| | |
| | | * Data structure to manage the state of this replication server |
| | | * and the state information for the servers connected to it. |
| | | */ |
| | | static class SubTopoMonitorData |
| | | private static class SubTopoMonitorData |
| | | { |
| | | /** This replication server DbState. */ |
| | | private ServerState replServerDbState; |
| | |
| | | new HashMap<Integer, ServerData>(); |
| | | } |
| | | |
| | | private SubTopoMonitorData data = new SubTopoMonitorData(); |
| | | private final SubTopoMonitorData data = new SubTopoMonitorData(); |
| | | |
| | | /** |
| | | * Creates a new MonitorMsg. |
| | |
| | | * @param state The server state. |
| | | * @param approxFirstMissingDate The approximation of the date |
| | | * of the older missing change. null when none. |
| | | * @param isLDAP Specifies whether the server is a LS or a RS |
| | | * @param isLDAPServer Specifies whether the server is a DS or a RS |
| | | */ |
| | | public void setServerState(int serverId, ServerState state, |
| | | long approxFirstMissingDate, boolean isLDAP) |
| | | long approxFirstMissingDate, boolean isLDAPServer) |
| | | { |
| | | ServerData sd = new ServerData(); |
| | | final ServerData sd = new ServerData(); |
| | | sd.state = state; |
| | | sd.approxFirstMissingDate = approxFirstMissingDate; |
| | | if (isLDAP) |
| | | if (isLDAPServer) |
| | | { |
| | | data.ldapStates.put(serverId, sd); |
| | | } |
| | | else |
| | | { |
| | | data.rsStates.put(serverId, sd); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | return data.rsStates.get(serverId).state; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Get the approximation of the date of the older missing change for the |
| | | * LDAP Server with the provided server Id. |
| | |
| | | * @throws DataFormatException If the byte array does not contain a valid |
| | | * encoded form of the ServerStartMessage. |
| | | */ |
| | | public MonitorMsg(byte[] in, short version) throws DataFormatException |
| | | MonitorMsg(byte[] in, short version) throws DataFormatException |
| | | { |
| | | ByteSequenceReader reader = ByteString.wrap(in).asReader(); |
| | | final ByteArrayScanner scanner = new ByteArrayScanner(in); |
| | | if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR) |
| | | { |
| | | throw new DataFormatException("input is not a valid " |
| | | + getClass().getCanonicalName()); |
| | | } |
| | | |
| | | if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR) |
| | | throw new DataFormatException("input is not a valid " + |
| | | this.getClass().getCanonicalName()); |
| | | int pos = 1; |
| | | |
| | | // sender |
| | | int length = getNextLength(in, pos); |
| | | String senderIDString = new String(in, pos, length, "UTF-8"); |
| | | this.senderID = Integer.valueOf(senderIDString); |
| | | pos += length +1; |
| | | |
| | | // destination |
| | | length = getNextLength(in, pos); |
| | | String destinationString = new String(in, pos, length, "UTF-8"); |
| | | this.destination = Integer.valueOf(destinationString); |
| | | pos += length +1; |
| | | |
| | | reader.position(pos); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | this.senderID = scanner.nextIntUTF8(); |
| | | this.destination = scanner.nextIntUTF8(); |
| | | } |
| | | else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | { |
| | | this.senderID = scanner.nextShort(); |
| | | this.destination = scanner.nextShort(); |
| | | } |
| | | else |
| | | { |
| | | if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR) |
| | | throw new DataFormatException("input is not a valid " + |
| | | this.getClass().getCanonicalName()); |
| | | |
| | | /* |
| | | * V4 and above uses integers for its serverIds while V2 and V3 |
| | | * use shorts. |
| | | */ |
| | | if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | { |
| | | // sender |
| | | this.senderID = reader.getShort(); |
| | | |
| | | // destination |
| | | this.destination = reader.getShort(); |
| | | } |
| | | else |
| | | { |
| | | // sender |
| | | this.senderID = reader.getInt(); |
| | | |
| | | // destination |
| | | this.destination = reader.getInt(); |
| | | } |
| | | this.senderID = scanner.nextInt(); |
| | | this.destination = scanner.nextInt(); |
| | | } |
| | | |
| | | |
| | | ASN1Reader asn1Reader = ASN1.getReader(reader); |
| | | ASN1Reader asn1Reader = scanner.getASN1Reader(); |
| | | try |
| | | { |
| | | asn1Reader.readStartSequence(); |
| | |
| | | else |
| | | { |
| | | // the next states are the server states |
| | | ServerData sd = new ServerData(); |
| | | sd.state = newState; |
| | | sd.approxFirstMissingDate = outime; |
| | | if (isLDAPServer) |
| | | data.ldapStates.put(serverId, sd); |
| | | else |
| | | data.rsStates.put(serverId, sd); |
| | | setServerState(serverId, newState, outime, isLDAPServer); |
| | | } |
| | | } |
| | | asn1Reader.readEndSequence(); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public byte[] getBytes(short protocolVersion) |
| | | { |
| | | try |
| | | { |
| | | ByteStringBuilder byteBuilder = new ByteStringBuilder(); |
| | | |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | /* put the type of the operation */ |
| | | byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR); |
| | | |
| | | /* |
| | | * V4 and above uses integers for its serverIds while V2 and V3 |
| | | * use shorts. |
| | | */ |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | byteBuilder.append(senderID); |
| | | byteBuilder.append(destination); |
| | | } |
| | | else |
| | | { |
| | | byteBuilder.append((short)senderID); |
| | | byteBuilder.append((short)destination); |
| | | } |
| | | } |
| | | final ByteArrayBuilder builder = new ByteArrayBuilder(); |
| | | builder.append(MSG_TYPE_REPL_SERVER_MONITOR); |
| | | append(builder, senderID, protocolVersion); |
| | | append(builder, destination, protocolVersion); |
| | | |
| | | /* Put the serverStates ... */ |
| | | ASN1Writer writer = ASN1.getWriter(byteBuilder); |
| | | ASN1Writer writer = builder.getASN1Writer(); |
| | | writer.writeStartSequence(); |
| | | { |
| | | /* first put the Replication Server state */ |
| | |
| | | } |
| | | writer.writeEndSequence(); |
| | | |
| | | // then the LDAP server data |
| | | // then the DS + RS server data |
| | | writeServerStates(protocolVersion, writer, false /* DS */); |
| | | |
| | | // then the RS server datas |
| | | writeServerStates(protocolVersion, writer, true /* RS */); |
| | | } |
| | | writer.writeEndSequence(); |
| | | |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | return byteBuilder.toByteArray(); |
| | | // legacy coding mistake |
| | | builder.append((byte) 0); |
| | | } |
| | | else |
| | | { |
| | | byte[] temp = byteBuilder.toByteArray(); |
| | | |
| | | byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8"); |
| | | byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8"); |
| | | |
| | | int length = 1 + 1 + senderBytes.length + |
| | | 1 + destinationBytes.length + temp.length +1; |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR; |
| | | int pos = 1; |
| | | |
| | | pos = addByteArray(senderBytes, resultByteArray, pos); |
| | | pos = addByteArray(destinationBytes, resultByteArray, pos); |
| | | pos = addByteArray(temp, resultByteArray, pos); |
| | | |
| | | return resultByteArray; |
| | | } |
| | | return builder.toByteArray(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void append(final ByteArrayBuilder builder, int data, |
| | | short protocolVersion) |
| | | { |
| | | if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | | { |
| | | builder.appendUTF8(data); |
| | | } |
| | | else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3) |
| | | { |
| | | builder.append((short) data); |
| | | } |
| | | else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4 |
| | | { |
| | | builder.append(data); |
| | | } |
| | | } |
| | | |
| | | private void writeServerStates(short protocolVersion, ASN1Writer writer, |
| | | boolean writeRSStates) throws IOException |
| | | { |
| | |
| | | return data.rsStates.keySet().iterator(); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the destination. |
| | | * |
| | |
| | | return destination; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the server ID of the server that sent this message. |
| | | * |
| | |
| | | return senderID; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | StringBuilder stateS = new StringBuilder("\nRState:["); |
| | | final StringBuilder stateS = new StringBuilder("\nRState:["); |
| | | stateS.append(data.replServerDbState); |
| | | stateS.append("]"); |
| | | |
| | |
| | | stateS.append("\nRSStates:["); |
| | | for (Entry<Integer, ServerData> entry : data.rsStates.entrySet()) |
| | | { |
| | | ServerData sd = entry.getValue(); |
| | | final ServerData sd = entry.getValue(); |
| | | stateS.append("\n[RSState(").append(entry.getKey()).append(")=") |
| | | .append(sd.state).append("]").append(" afmd=") |
| | | .append(sd.approxFirstMissingDate + "]"); |
| | | .append(sd.approxFirstMissingDate).append("]"); |
| | | } |
| | | return getClass().getCanonicalName() + |
| | | "[ sender=" + this.senderID + |