mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.17.2014 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,21 +27,16 @@
package org.opends.server.replication.protocol;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Reader;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
/**
 * This message is part of the replication protocol.
@@ -69,7 +64,7 @@
   * first missing change for each LDAP server connected to a Replication
   * Server.
   */
  static class ServerData
  private static class ServerData
  {
    private ServerState state;
    private long approxFirstMissingDate;
@@ -79,7 +74,7 @@
   * Data structure to manage the state of this replication server
   * and the state information for the servers connected to it.
   */
  static class SubTopoMonitorData
  private static class SubTopoMonitorData
  {
    /** This replication server DbState. */
    private ServerState replServerDbState;
@@ -91,7 +86,7 @@
        new HashMap<Integer, ServerData>();
  }
  private SubTopoMonitorData data = new SubTopoMonitorData();
  private final SubTopoMonitorData data = new SubTopoMonitorData();
  /**
   * Creates a new MonitorMsg.
@@ -120,18 +115,22 @@
   * @param state The server state.
   * @param approxFirstMissingDate  The approximation of the date
   * of the older missing change. null when none.
   * @param isLDAP Specifies whether the server is a LS or a RS
   * @param isLDAPServer Specifies whether the server is a DS or a RS
   */
  public void setServerState(int serverId, ServerState state,
      long approxFirstMissingDate, boolean isLDAP)
      long approxFirstMissingDate, boolean isLDAPServer)
  {
    ServerData sd = new ServerData();
    final ServerData sd = new ServerData();
    sd.state = state;
    sd.approxFirstMissingDate = approxFirstMissingDate;
    if (isLDAP)
    if (isLDAPServer)
    {
      data.ldapStates.put(serverId, sd);
    }
    else
    {
      data.rsStates.put(serverId, sd);
    }
  }
  /**
@@ -154,7 +153,6 @@
    return data.rsStates.get(serverId).state;
  }
  /**
   * Get the approximation of the date of the older missing change for the
   * LDAP Server with the provided server Id.
@@ -185,69 +183,32 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public MonitorMsg(byte[] in, short version) throws DataFormatException
  MonitorMsg(byte[] in, short version) throws DataFormatException
  {
    ByteSequenceReader reader = ByteString.wrap(in).asReader();
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR)
    {
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      try
      {
        /* first byte is the type */
        if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
          throw new DataFormatException("input is not a valid " +
              this.getClass().getCanonicalName());
        int pos = 1;
        // sender
        int length = getNextLength(in, pos);
        String senderIDString = new String(in, pos, length, "UTF-8");
        this.senderID = Integer.valueOf(senderIDString);
        pos += length +1;
        // destination
        length = getNextLength(in, pos);
        String destinationString = new String(in, pos, length, "UTF-8");
        this.destination = Integer.valueOf(destinationString);
        pos += length +1;
        reader.position(pos);
      }
      catch (UnsupportedEncodingException e)
      {
        throw new DataFormatException("UTF-8 is not supported by this jvm.");
      }
      this.senderID = scanner.nextIntUTF8();
      this.destination = scanner.nextIntUTF8();
    }
    else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      this.senderID = scanner.nextShort();
      this.destination = scanner.nextShort();
    }
    else
    {
      if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      /*
       * V4 and above uses integers for its serverIds while V2 and V3
       * use shorts.
       */
      if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
      {
        // sender
        this.senderID = reader.getShort();
        // destination
        this.destination = reader.getShort();
      }
      else
      {
        // sender
        this.senderID = reader.getInt();
        // destination
        this.destination = reader.getInt();
      }
      this.senderID = scanner.nextInt();
      this.destination = scanner.nextInt();
    }
    ASN1Reader asn1Reader = ASN1.getReader(reader);
    ASN1Reader asn1Reader = scanner.getASN1Reader();
    try
    {
      asn1Reader.readStartSequence();
@@ -297,13 +258,7 @@
        else
        {
          // the next states are the server states
          ServerData sd = new ServerData();
          sd.state = newState;
          sd.approxFirstMissingDate = outime;
          if (isLDAPServer)
            data.ldapStates.put(serverId, sd);
          else
            data.rsStates.put(serverId, sd);
          setServerState(serverId, newState, outime, isLDAPServer);
        }
      }
      asn1Reader.readEndSequence();
@@ -312,39 +267,19 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      ByteStringBuilder byteBuilder = new ByteStringBuilder();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        /* put the type of the operation */
        byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
        /*
         * V4 and above uses integers for its serverIds while V2 and V3
         * use shorts.
         */
        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          byteBuilder.append(senderID);
          byteBuilder.append(destination);
        }
        else
        {
          byteBuilder.append((short)senderID);
          byteBuilder.append((short)destination);
        }
      }
      final ByteArrayBuilder builder = new ByteArrayBuilder();
      builder.append(MSG_TYPE_REPL_SERVER_MONITOR);
      append(builder, senderID, protocolVersion);
      append(builder, destination, protocolVersion);
      /* Put the serverStates ... */
      ASN1Writer writer = ASN1.getWriter(byteBuilder);
      ASN1Writer writer = builder.getASN1Writer();
      writer.writeStartSequence();
      {
        /* first put the Replication Server state */
@@ -354,39 +289,18 @@
        }
        writer.writeEndSequence();
        // then the LDAP server data
        // then the DS + RS server data
        writeServerStates(protocolVersion, writer, false /* DS */);
        // then the RS server datas
        writeServerStates(protocolVersion, writer, true /* RS */);
      }
      writer.writeEndSequence();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        return byteBuilder.toByteArray();
        // legacy coding mistake
        builder.append((byte) 0);
      }
      else
      {
        byte[] temp = byteBuilder.toByteArray();
        byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
        byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
        int length = 1 +  1 + senderBytes.length +
        1 + destinationBytes.length + temp.length +1;
        byte[] resultByteArray = new byte[length];
        /* put the type of the operation */
        resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
        int pos = 1;
        pos = addByteArray(senderBytes, resultByteArray, pos);
        pos = addByteArray(destinationBytes, resultByteArray, pos);
        pos = addByteArray(temp, resultByteArray, pos);
        return resultByteArray;
      }
      return builder.toByteArray();
    }
    catch (Exception e)
    {
@@ -394,6 +308,23 @@
    }
  }
  private void append(final ByteArrayBuilder builder, int data,
      short protocolVersion)
  {
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      builder.appendUTF8(data);
    }
    else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      builder.append((short) data);
    }
    else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4
    {
      builder.append(data);
    }
  }
  private void writeServerStates(short protocolVersion, ASN1Writer writer,
      boolean writeRSStates) throws IOException
  {
@@ -454,8 +385,6 @@
    return data.rsStates.keySet().iterator();
  }
  /**
   * Get the destination.
   *
@@ -466,8 +395,6 @@
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
@@ -478,15 +405,11 @@
    return senderID;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    StringBuilder stateS = new StringBuilder("\nRState:[");
    final StringBuilder stateS = new StringBuilder("\nRState:[");
    stateS.append(data.replServerDbState);
    stateS.append("]");
@@ -502,10 +425,10 @@
    stateS.append("\nRSStates:[");
    for (Entry<Integer, ServerData> entry : data.rsStates.entrySet())
    {
      ServerData sd = entry.getValue();
      final ServerData sd = entry.getValue();
      stateS.append("\n[RSState(").append(entry.getKey()).append(")=")
            .append(sd.state).append("]").append(" afmd=")
            .append(sd.approxFirstMissingDate + "]");
            .append(sd.approxFirstMissingDate).append("]");
    }
    return getClass().getCanonicalName() +
    "[ sender=" + this.senderID +