opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -29,20 +29,71 @@ import java.util.Date; import org.opends.server.types.ByteSequence; import org.opends.server.types.ByteSequenceReader; import org.opends.server.types.ByteString; import org.opends.server.types.ByteStringBuilder; /** * Class used to represent Change Numbers. */ public class ChangeNumber implements java.io.Serializable, java.lang.Comparable<ChangeNumber> { /** * The number of bytes used by the byte string representation of a change * number. * * @see #valueOf(ByteSequence) * @see #toByteString() * @see #toByteString(ByteStringBuilder) */ public static final int BYTE_ENCODING_LENGTH = 14; /** * The number of characters used by the string representation of a change * number. * * @see #valueOf(String) * @see #toString() */ public static final int STRING_ENCODING_LENGTH = 28; private static final long serialVersionUID = -8802722277749190740L; private final long timeStamp; private final int seqnum; private final int serverId; // A String representation of the ChangeNumber suitable for network // transmission. private String formatedString = null; /** * Parses the provided {@link #toString()} representation of a change number. * * @param s * The string to be parsed. * @return The parsed change number. * @see #toString() */ public static ChangeNumber valueOf(String s) { return new ChangeNumber(s); } /** * Decodes the provided {@link #toByteString()} representation of a change * number. * * @param bs * The byte sequence to be parsed. * @return The decoded change number. * @see #toByteString() */ public static ChangeNumber valueOf(ByteSequence bs) { ByteSequenceReader reader = bs.asReader(); long timeStamp = reader.getLong(); int serverId = reader.getShort() & 0xffff; int seqnum = reader.getInt(); return new ChangeNumber(timeStamp, seqnum, serverId); } /** * Create a new ChangeNumber from a String. @@ -59,8 +110,6 @@ temp = str.substring(20, 28); seqnum = Integer.parseInt(temp, 16); formatedString = str; } /** @@ -140,25 +189,47 @@ } /** * Encodes this change number as a byte string. * <p> * NOTE: this representation must not be modified otherwise interop with * earlier protocol versions will be broken. * * @return The encoded representation of this change number. * @see #valueOf(ByteSequence) */ public ByteString toByteString() { return toByteString(new ByteStringBuilder(BYTE_ENCODING_LENGTH)) .toByteString(); } /** * Encodes this change number into the provided byte string builder. * <p> * NOTE: this representation must not be modified otherwise interop with * earlier protocol versions will be broken. * * @param builder * The byte string builder. * @return The byte string builder containing the encoded change number. * @see #valueOf(ByteSequence) */ public ByteStringBuilder toByteString(ByteStringBuilder builder) { return builder.append(timeStamp).append((short) (serverId & 0xffff)) .append(seqnum); } /** * Convert the ChangeNumber to a printable String. * <p> * NOTE: this representation must not be modified otherwise interop with * earlier protocol versions will be broken. * * @return the string */ public String toString() { return format(); } /** * Convert the ChangeNumber to a String that is suitable for network * transmission. * * @return the string */ public String format() { if (formatedString != null) return formatedString; return String.format("%016x%04x%08x", timeStamp, serverId, seqnum); } opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -27,6 +27,7 @@ */ package org.opends.server.replication.common; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Date; @@ -37,6 +38,8 @@ import java.util.Set; import java.util.zip.DataFormatException; import org.opends.server.protocols.asn1.ASN1Writer; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.types.ByteString; @@ -278,6 +281,41 @@ } return values; } /** * Encodes this server state to the provided ASN1 writer. * * @param writer * The ASN1 writer. * @param protocolVersion * The replication protocol version. * @throws IOException * If an error occurred during encoding. */ public void writeTo(ASN1Writer writer, short protocolVersion) throws IOException { synchronized (list) { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) { for (ChangeNumber cn : list.values()) { writer.writeOctetString(cn.toByteString()); } } else { for (ChangeNumber cn : list.values()) { writer.writeOctetString(cn.toString()); } } } } /** * Return the text representation of ServerState. * @return the text representation of ServerState opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -230,7 +230,7 @@ oStream.write(MSG_TYPE_ACK); /* Put the ChangeNumber */ byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8"); byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8"); oStream.write(changeNumberByte); oStream.write(0); opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -23,20 +23,24 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.types.ByteSequenceReader; import org.opends.server.types.ByteString; import org.opends.server.types.ByteStringBuilder; /** * Class that define messages sent by a replication domain (DS) * to the replication server to let the RS know the DS current * change time. * Class that define messages sent by a replication domain (DS) to the * replication server to let the RS know the DS current change time. */ public class ChangeTimeHeartbeatMsg extends ReplicationMsg { @@ -45,26 +49,25 @@ */ private final ChangeNumber changeNumber; /** * Constructor of a Change Time Heartbeat message. */ public ChangeTimeHeartbeatMsg() { this.changeNumber = new ChangeNumber((long)0,0,0); } /** * Constructor of a Change Time Heartbeat message providing * the change time value in a change number. * @param cn The provided change number. * Constructor of a Change Time Heartbeat message providing the change time * value in a change number. * * @param cn * The provided change number. */ public ChangeTimeHeartbeatMsg(ChangeNumber cn) { this.changeNumber = cn; } /** * Get a change number with the transmitted change time. * * @return the ChangeNumber */ public ChangeNumber getChangeNumber() @@ -72,77 +75,92 @@ return changeNumber; } /** * Encode a change time message. * @return The encoded message. * @throws UnsupportedEncodingException When an error occurs. */ public byte[] encode() throws UnsupportedEncodingException { byte[] changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); int length = changeNumberByte.length; byte[] encodedMsg = new byte[length]; /* Put the ChangeNumber */ addByteArray(changeNumberByte, encodedMsg, 0); return encodedMsg; } /** * Creates a message from a provided byte array. * @param in The provided byte array. * @throws DataFormatException When an error occurs. * * @param in * The provided byte array. * @param version * The version of the protocol to use to decode the msg. * @throws DataFormatException * When an error occurs. */ public ChangeTimeHeartbeatMsg(byte[] in) throws DataFormatException public ChangeTimeHeartbeatMsg(byte[] in, short version) throws DataFormatException { final ByteSequenceReader reader = ByteString.wrap(in).asReader(); try { /* Read the changeNumber */ /* First byte is the type */ if (in[0] != MSG_TYPE_CT_HEARTBEAT) if (reader.get() != MSG_TYPE_CT_HEARTBEAT) { // Throw better exception below. throw new IllegalArgumentException(); } if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7) { changeNumber = ChangeNumber.valueOf(reader .getByteSequence(ChangeNumber.BYTE_ENCODING_LENGTH)); } else { changeNumber = ChangeNumber.valueOf(reader .getString(ChangeNumber.STRING_ENCODING_LENGTH)); reader.get(); // Read trailing 0 byte. } if (reader.remaining() > 0) { // Throw better exception below. throw new IllegalArgumentException(); } } catch (Exception e) { // Index out of bounds, bad format, etc. throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg"); } int pos = 1; int length = getNextLength(in, pos); String changenumberStr = new String(in, pos, length, "UTF-8"); changeNumber = new ChangeNumber(changenumberStr); } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } catch (IllegalArgumentException e) { throw new DataFormatException(e.getMessage()); } } /** * Get a byte array from the message. * @return The byte array containing the PDU of the message. * @throws UnsupportedEncodingException When an error occurs. * {@inheritDoc} */ public byte[] getBytes() throws UnsupportedEncodingException @Override public byte[] getBytes() { try { ByteArrayOutputStream oStream = new ByteArrayOutputStream(); return getBytes(ProtocolVersion.getCurrentVersion()); } /* Put the type of the operation */ oStream.write(MSG_TYPE_CT_HEARTBEAT); /* Put the ChangeNumber */ byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8"); oStream.write(changeNumberByte); oStream.write(0); return oStream.toByteArray(); } catch (IOException e) /** * {@inheritDoc} */ @Override public byte[] getBytes(short protocolVersion) { // never happens return null; if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) { final ByteStringBuilder builder = new ByteStringBuilder( ChangeNumber.BYTE_ENCODING_LENGTH + 1 /* type + csn */); builder.append(MSG_TYPE_CT_HEARTBEAT); changeNumber.toByteString(builder); return builder.toByteArray(); } else { final ByteStringBuilder builder = new ByteStringBuilder( ChangeNumber.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */); builder.append(MSG_TYPE_CT_HEARTBEAT); builder.append(changeNumber.toString()); builder.append((byte) 0); // For compatibility with earlier protocol // versions. return builder.toByteArray(); } } } opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,12 +27,12 @@ */ package org.opends.server.replication.protocol; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Set; import java.util.Map; import org.opends.server.replication.common.ServerState; import org.opends.server.protocols.asn1.ASN1Reader; @@ -277,8 +277,16 @@ // loop on the list of CN of the state while(asn1Reader.hasNextElement()) { String s = asn1Reader.readOctetStringAsString(); ChangeNumber cn = new ChangeNumber(s); ChangeNumber cn; if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7) { cn = ChangeNumber.valueOf(asn1Reader.readOctetString()); } else { cn = ChangeNumber.valueOf(asn1Reader.readOctetStringAsString()); } if ((data.replServerDbState != null) && (serverId == 0)) { // we are on the first CN that is a fake CN to store the serverId @@ -323,7 +331,6 @@ */ @Override public byte[] getBytes() throws UnsupportedEncodingException { return getBytes(ProtocolVersion.getCurrentVersion()); } @@ -333,12 +340,10 @@ */ @Override public byte[] getBytes(short protocolVersion) throws UnsupportedEncodingException { try { ByteStringBuilder byteBuilder = new ByteStringBuilder(); ASN1Writer writer = ASN1.getWriter(byteBuilder); if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) { @@ -362,66 +367,22 @@ } /* Put the serverStates ... */ ASN1Writer writer = ASN1.getWriter(byteBuilder); writer.writeStartSequence(); { /* first put the Replication Server state */ writer.writeStartSequence(); ArrayList<ByteString> cnOctetList = data.replServerDbState.toASN1ArrayList(); for (ByteString soci : cnOctetList) { writer.writeOctetString(soci); data.replServerDbState.writeTo(writer, protocolVersion); } writer.writeEndSequence(); // then the LDAP server data Set<Integer> servers = data.ldapStates.keySet(); for (Integer sid : servers) { ServerState statei = data.ldapStates.get(sid).state; Long outime = data.ldapStates.get(sid).approxFirstMissingDate; // retrieves the change numbers as an arrayList of ANSN1OctetString cnOctetList = statei.toASN1ArrayList(); writer.writeStartSequence(); // a fake changenumber helps storing the LDAP server ID ChangeNumber cn = new ChangeNumber(outime,1,sid); writer.writeOctetString(cn.toString()); // the changenumbers that make the state for (ByteString soci : cnOctetList) { writer.writeOctetString(soci); } writer.writeEndSequence(); } writeServerStates(protocolVersion, writer, false /* DS */); // then the RS server datas servers = data.rsStates.keySet(); for (Integer sid : servers) { ServerState statei = data.rsStates.get(sid).state; Long outime = data.rsStates.get(sid).approxFirstMissingDate; // retrieves the change numbers as an arrayList of ANSN1OctetString cnOctetList = statei.toASN1ArrayList(); writer.writeStartSequence(); // a fake changenumber helps storing the LDAP server ID ChangeNumber cn = new ChangeNumber(outime,0,sid); writer.writeOctetString(cn.toString()); // the changenumbers that make the state for (ByteString soci : cnOctetList) { writer.writeOctetString(soci); writeServerStates(protocolVersion, writer, true /* RS */); } writer.writeEndSequence(); } writer.writeEndSequence(); if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) @@ -456,6 +417,39 @@ } } private void writeServerStates(short protocolVersion, ASN1Writer writer, boolean writeRSStates) throws IOException { Map<Integer, ServerData> servers = writeRSStates ? data.rsStates : data.ldapStates; for (Map.Entry<Integer, ServerData> server : servers.entrySet()) { writer.writeStartSequence(); { /* * A fake change number helps storing the LDAP server ID. The sequence * number will be used to differentiate between an LDAP server (1) or an * RS (0). */ ChangeNumber cn = new ChangeNumber( server.getValue().approxFirstMissingDate, writeRSStates ? 0 : 1, server.getKey()); if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) { writer.writeOctetString(cn.toByteString()); } else { writer.writeOctetString(cn.toString()); } // the changenumbers that make the state server.getValue().state.writeTo(writer, protocolVersion); } writer.writeEndSequence(); } } /** * Get the state of the replication server that sent this message. * @return The state. opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -81,9 +81,15 @@ public static final short REPLICATION_PROTOCOL_V6 = 6; /** * The constant for the 7th version of the replication protocol. * - compact encoding for length, CSNs, and server IDs. */ public static final short REPLICATION_PROTOCOL_V7 = 7; /** * The replication protocol version used by the instance of RS/DS in this VM. */ private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V6; private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7; /** * Gets the current version of the replication protocol. opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -246,7 +246,7 @@ msg = new ECLUpdateMsg(buffer); break; case MSG_TYPE_CT_HEARTBEAT: msg = new ChangeTimeHeartbeatMsg(buffer); msg = new ChangeTimeHeartbeatMsg(buffer, version); break; case MSG_TYPE_REPL_SERVER_START_DS: msg = new ReplServerStartDSMsg(buffer);