mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
10.25.2013 4fd152ec8ba98ac9a70202dbac2b3a579df1033a
Fix OPENDJ-951: Reduce size and frequency of replication MonitorMsg

* use a more compact encoding for change numbers in monitor and last change time replication protocol messages. Rather than encoding the change numbers as 28 byte hex strings, we now use a 14 byte binary representation
* see CR-1818 for more details.
7 files modified
417 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/ChangeNumber.java 109 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 38 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/AckMsg.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java 152 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java 106 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -29,20 +29,71 @@
import java.util.Date;
import org.opends.server.types.ByteSequence;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
/**
 * Class used to represent Change Numbers.
 */
public class ChangeNumber implements java.io.Serializable,
                                     java.lang.Comparable<ChangeNumber>
{
  /**
   * The number of bytes used by the byte string representation of a change
   * number.
   *
   * @see #valueOf(ByteSequence)
   * @see #toByteString()
   * @see #toByteString(ByteStringBuilder)
   */
  public static final int BYTE_ENCODING_LENGTH = 14;
  /**
   * The number of characters used by the string representation of a change
   * number.
   *
   * @see #valueOf(String)
   * @see #toString()
   */
  public static final int STRING_ENCODING_LENGTH = 28;
  private static final long serialVersionUID = -8802722277749190740L;
  private final long timeStamp;
  private final int seqnum;
  private final int serverId;
  // A String representation of the ChangeNumber suitable for network
  // transmission.
  private String formatedString = null;
  /**
   * Parses the provided {@link #toString()} representation of a change number.
   *
   * @param s
   *          The string to be parsed.
   * @return The parsed change number.
   * @see #toString()
   */
  public static ChangeNumber valueOf(String s)
  {
    return new ChangeNumber(s);
  }
  /**
   * Decodes the provided {@link #toByteString()} representation of a change
   * number.
   *
   * @param bs
   *          The byte sequence to be parsed.
   * @return The decoded change number.
   * @see #toByteString()
   */
  public static ChangeNumber valueOf(ByteSequence bs)
  {
    ByteSequenceReader reader = bs.asReader();
    long timeStamp = reader.getLong();
    int serverId = reader.getShort() & 0xffff;
    int seqnum = reader.getInt();
    return new ChangeNumber(timeStamp, seqnum, serverId);
  }
  /**
   * Create a new ChangeNumber from a String.
@@ -59,8 +110,6 @@
    temp = str.substring(20, 28);
    seqnum = Integer.parseInt(temp, 16);
    formatedString = str;
  }
  /**
@@ -140,25 +189,47 @@
  }
  /**
   * Encodes this change number as a byte string.
   * <p>
   * NOTE: this representation must not be modified otherwise interop with
   * earlier protocol versions will be broken.
   *
   * @return The encoded representation of this change number.
   * @see #valueOf(ByteSequence)
   */
  public ByteString toByteString()
  {
    return toByteString(new ByteStringBuilder(BYTE_ENCODING_LENGTH))
        .toByteString();
  }
  /**
   * Encodes this change number into the provided byte string builder.
   * <p>
   * NOTE: this representation must not be modified otherwise interop with
   * earlier protocol versions will be broken.
   *
   * @param builder
   *          The byte string builder.
   * @return The byte string builder containing the encoded change number.
   * @see #valueOf(ByteSequence)
   */
  public ByteStringBuilder toByteString(ByteStringBuilder builder)
  {
    return builder.append(timeStamp).append((short) (serverId & 0xffff))
        .append(seqnum);
  }
  /**
   * Convert the ChangeNumber to a printable String.
   * <p>
   * NOTE: this representation must not be modified otherwise interop with
   * earlier protocol versions will be broken.
   *
   * @return the string
   */
  public String toString()
  {
    return format();
  }
  /**
   * Convert the ChangeNumber to a String that is suitable for network
   * transmission.
   *
   * @return the string
   */
  public String format()
  {
    if (formatedString != null)
      return formatedString;
    return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
  }
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -27,6 +27,7 @@
 */
package org.opends.server.replication.common;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
@@ -37,6 +38,8 @@
import java.util.Set;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.types.ByteString;
@@ -278,6 +281,41 @@
    }
    return values;
  }
  /**
   * Encodes this server state to the provided ASN1 writer.
   *
   * @param writer
   *          The ASN1 writer.
   * @param protocolVersion
   *          The replication protocol version.
   * @throws IOException
   *           If an error occurred during encoding.
   */
  public void writeTo(ASN1Writer writer, short protocolVersion)
      throws IOException
  {
    synchronized (list)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
      {
        for (ChangeNumber cn : list.values())
        {
          writer.writeOctetString(cn.toByteString());
        }
      }
      else
      {
        for (ChangeNumber cn : list.values())
        {
          writer.writeOctetString(cn.toString());
        }
      }
    }
  }
  /**
   * Return the text representation of ServerState.
   * @return the text representation of ServerState
opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -230,7 +230,7 @@
      oStream.write(MSG_TYPE_ACK);
      /* Put the ChangeNumber */
      byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8");
      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
      oStream.write(changeNumberByte);
      oStream.write(0);
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -23,20 +23,24 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
/**
 * Class that define messages sent by a replication domain (DS)
 * to the replication server to let the RS know the DS current
 * change time.
 * Class that define messages sent by a replication domain (DS) to the
 * replication server to let the RS know the DS current change time.
 */
public class ChangeTimeHeartbeatMsg extends ReplicationMsg
{
@@ -45,26 +49,25 @@
   */
  private final ChangeNumber changeNumber;
  /**
   * Constructor of a Change Time Heartbeat message.
   */
  public ChangeTimeHeartbeatMsg()
  {
    this.changeNumber = new ChangeNumber((long)0,0,0);
  }
  /**
   * Constructor of a Change Time Heartbeat message providing
   * the change time value in a change number.
   * @param cn The provided change number.
   * Constructor of a Change Time Heartbeat message providing the change time
   * value in a change number.
   *
   * @param cn
   *          The provided change number.
   */
  public ChangeTimeHeartbeatMsg(ChangeNumber cn)
  {
    this.changeNumber = cn;
  }
  /**
   * Get a change number with the transmitted change time.
   *
   * @return the ChangeNumber
   */
  public ChangeNumber getChangeNumber()
@@ -72,77 +75,92 @@
    return changeNumber;
  }
  /**
   * Encode a change time message.
   * @return The encoded message.
   * @throws UnsupportedEncodingException When an error occurs.
   */
  public byte[] encode() throws UnsupportedEncodingException
  {
    byte[] changeNumberByte =
      this.getChangeNumber().toString().getBytes("UTF-8");
    int length = changeNumberByte.length;
    byte[] encodedMsg = new byte[length];
    /* Put the ChangeNumber */
    addByteArray(changeNumberByte, encodedMsg, 0);
    return encodedMsg;
  }
  /**
   * Creates a message from a provided byte array.
   * @param in The provided byte array.
   * @throws DataFormatException When an error occurs.
   *
   * @param in
   *          The provided byte array.
   * @param version
   *          The version of the protocol to use to decode the msg.
   * @throws DataFormatException
   *           When an error occurs.
   */
  public ChangeTimeHeartbeatMsg(byte[] in) throws DataFormatException
  public ChangeTimeHeartbeatMsg(byte[] in, short version)
      throws DataFormatException
  {
    final ByteSequenceReader reader = ByteString.wrap(in).asReader();
    try
    {
      /* Read the changeNumber */
      /* First byte is the type */
      if (in[0] != MSG_TYPE_CT_HEARTBEAT)
      if (reader.get() != MSG_TYPE_CT_HEARTBEAT)
      {
        // Throw better exception below.
        throw new IllegalArgumentException();
      }
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
      {
        changeNumber = ChangeNumber.valueOf(reader
            .getByteSequence(ChangeNumber.BYTE_ENCODING_LENGTH));
      }
      else
      {
        changeNumber = ChangeNumber.valueOf(reader
            .getString(ChangeNumber.STRING_ENCODING_LENGTH));
        reader.get(); // Read trailing 0 byte.
      }
      if (reader.remaining() > 0)
      {
        // Throw better exception below.
        throw new IllegalArgumentException();
      }
    }
    catch (Exception e)
    {
      // Index out of bounds, bad format, etc.
        throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
      }
      int pos = 1;
      int length = getNextLength(in, pos);
      String changenumberStr = new String(in, pos, length, "UTF-8");
      changeNumber = new ChangeNumber(changenumberStr);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (IllegalArgumentException e)
    {
      throw new DataFormatException(e.getMessage());
    }
  }
  /**
   * Get a byte array from the message.
   * @return The byte array containing the PDU of the message.
   * @throws UnsupportedEncodingException When an error occurs.
   * {@inheritDoc}
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  @Override
  public byte[] getBytes()
  {
    try {
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
      /* Put the type of the operation */
      oStream.write(MSG_TYPE_CT_HEARTBEAT);
      /* Put the ChangeNumber */
      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
      oStream.write(changeNumberByte);
      oStream.write(0);
      return oStream.toByteArray();
    } catch (IOException e)
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes(short protocolVersion)
    {
      // never happens
      return null;
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
    {
      final ByteStringBuilder builder = new ByteStringBuilder(
          ChangeNumber.BYTE_ENCODING_LENGTH + 1 /* type + csn */);
      builder.append(MSG_TYPE_CT_HEARTBEAT);
      changeNumber.toByteString(builder);
      return builder.toByteArray();
    }
    else
    {
      final ByteStringBuilder builder = new ByteStringBuilder(
          ChangeNumber.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */);
      builder.append(MSG_TYPE_CT_HEARTBEAT);
      builder.append(changeNumber.toString());
      builder.append((byte) 0); // For compatibility with earlier protocol
                                // versions.
      return builder.toByteArray();
    }
  }
}
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,12 +27,12 @@
 */
package org.opends.server.replication.protocol;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.Map;
import org.opends.server.replication.common.ServerState;
import org.opends.server.protocols.asn1.ASN1Reader;
@@ -277,8 +277,16 @@
        // loop on the list of CN of the state
        while(asn1Reader.hasNextElement())
        {
          String s = asn1Reader.readOctetStringAsString();
          ChangeNumber cn = new ChangeNumber(s);
          ChangeNumber cn;
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
          {
            cn = ChangeNumber.valueOf(asn1Reader.readOctetString());
          }
          else
          {
            cn = ChangeNumber.valueOf(asn1Reader.readOctetStringAsString());
          }
          if ((data.replServerDbState != null) && (serverId == 0))
          {
            // we are on the first CN that is a fake CN to store the serverId
@@ -323,7 +331,6 @@
   */
  @Override
  public byte[] getBytes()
  throws UnsupportedEncodingException
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
@@ -333,12 +340,10 @@
   */
  @Override
  public byte[] getBytes(short protocolVersion)
     throws UnsupportedEncodingException
  {
    try
    {
      ByteStringBuilder byteBuilder = new ByteStringBuilder();
      ASN1Writer writer = ASN1.getWriter(byteBuilder);
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
@@ -362,66 +367,22 @@
      }
      /* Put the serverStates ... */
      ASN1Writer writer = ASN1.getWriter(byteBuilder);
      writer.writeStartSequence();
      {
      /* first put the Replication Server state */
      writer.writeStartSequence();
      ArrayList<ByteString> cnOctetList =
        data.replServerDbState.toASN1ArrayList();
      for (ByteString soci : cnOctetList)
      {
        writer.writeOctetString(soci);
          data.replServerDbState.writeTo(writer, protocolVersion);
      }
      writer.writeEndSequence();
      // then the LDAP server data
      Set<Integer> servers = data.ldapStates.keySet();
      for (Integer sid : servers)
      {
        ServerState statei = data.ldapStates.get(sid).state;
        Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
        // retrieves the change numbers as an arrayList of ANSN1OctetString
        cnOctetList = statei.toASN1ArrayList();
        writer.writeStartSequence();
        // a fake changenumber helps storing the LDAP server ID
        ChangeNumber cn = new ChangeNumber(outime,1,sid);
        writer.writeOctetString(cn.toString());
        // the changenumbers that make the state
        for (ByteString soci : cnOctetList)
        {
          writer.writeOctetString(soci);
        }
        writer.writeEndSequence();
      }
        writeServerStates(protocolVersion, writer, false /* DS */);
      // then the RS server datas
      servers = data.rsStates.keySet();
      for (Integer sid : servers)
      {
        ServerState statei = data.rsStates.get(sid).state;
        Long outime = data.rsStates.get(sid).approxFirstMissingDate;
        // retrieves the change numbers as an arrayList of ANSN1OctetString
        cnOctetList = statei.toASN1ArrayList();
        writer.writeStartSequence();
        // a fake changenumber helps storing the LDAP server ID
        ChangeNumber cn = new ChangeNumber(outime,0,sid);
        writer.writeOctetString(cn.toString());
        // the changenumbers that make the state
        for (ByteString soci : cnOctetList)
        {
          writer.writeOctetString(soci);
        writeServerStates(protocolVersion, writer, true /* RS */);
        }
        writer.writeEndSequence();
      }
      writer.writeEndSequence();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -456,6 +417,39 @@
    }
  }
  private void writeServerStates(short protocolVersion, ASN1Writer writer,
      boolean writeRSStates) throws IOException
  {
    Map<Integer, ServerData> servers = writeRSStates ? data.rsStates
        : data.ldapStates;
    for (Map.Entry<Integer, ServerData> server : servers.entrySet())
    {
      writer.writeStartSequence();
      {
        /*
         * A fake change number helps storing the LDAP server ID. The sequence
         * number will be used to differentiate between an LDAP server (1) or an
         * RS (0).
         */
        ChangeNumber cn = new ChangeNumber(
            server.getValue().approxFirstMissingDate, writeRSStates ? 0 : 1,
            server.getKey());
        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
        {
          writer.writeOctetString(cn.toByteString());
        }
        else
        {
          writer.writeOctetString(cn.toString());
        }
        // the changenumbers that make the state
        server.getValue().state.writeTo(writer, protocolVersion);
      }
      writer.writeEndSequence();
    }
  }
  /**
   * Get the state of the replication server that sent this message.
   * @return The state.
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -81,9 +81,15 @@
  public static final short REPLICATION_PROTOCOL_V6 = 6;
  /**
   * The constant for the 7th version of the replication protocol.
   * - compact encoding for length, CSNs, and server IDs.
   */
  public static final short REPLICATION_PROTOCOL_V7 = 7;
  /**
   * The replication protocol version used by the instance of RS/DS in this VM.
   */
  private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V6;
  private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7;
  /**
   * Gets the current version of the replication protocol.
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -246,7 +246,7 @@
        msg = new ECLUpdateMsg(buffer);
      break;
      case MSG_TYPE_CT_HEARTBEAT:
        msg = new ChangeTimeHeartbeatMsg(buffer);
        msg = new ChangeTimeHeartbeatMsg(buffer, version);
      break;
      case MSG_TYPE_REPL_SERVER_START_DS:
        msg = new ReplServerStartDSMsg(buffer);