mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -38,18 +38,16 @@
import org.opends.server.types.ByteString;
/**
 * This class is used to associate serverIds with ChangeNumbers.
 * This class is used to associate serverIds with {@link CSN}s.
 * <p>
 * For example, it is exchanged with the replication servers at connection
 * establishment time to communicate
 * "which ChangeNumbers last seen by a serverId"
 * establishment time to communicate "which CSNs was last seen by a serverId".
 */
public class ServerState implements Iterable<Integer>
{
  /** Associates a serverId with a ChangeNumber. */
  private final Map<Integer, ChangeNumber> serverIdToChangeNumber =
      new HashMap<Integer, ChangeNumber>();
  /** Associates a serverId with a CSN. */
  private final Map<Integer, CSN> serverIdToCSN = new HashMap<Integer, CSN>();
  /**
   * Whether the state has been saved to persistent storage. It starts at true,
   * and moves to false when an update is made to the current object.
@@ -71,9 +69,9 @@
   */
  public void clear()
  {
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      serverIdToChangeNumber.clear();
      serverIdToCSN.clear();
    }
  }
@@ -95,8 +93,8 @@
    {
      while (endpos > pos)
      {
        // FIXME JNR: why store the serverId separately from the changeNumber
        // since the changeNumber already contains the serverId?
        // FIXME JNR: why store the serverId separately from the CSN since the
        // CSN already contains the serverId?
        // read the ServerId
        int length = getNextLength(in, pos);
@@ -104,14 +102,14 @@
        int serverId = Integer.valueOf(serverIdString);
        pos += length +1;
        // read the ChangeNumber
        // read the CSN
        length = getNextLength(in, pos);
        String cnString = new String(in, pos, length, "UTF-8");
        ChangeNumber cn = new ChangeNumber(cnString);
        String csnString = new String(in, pos, length, "UTF-8");
        CSN csn = new CSN(csnString);
        pos += length +1;
        // Add the serverId
        serverIdToChangeNumber.put(serverId, cn);
        serverIdToCSN.put(serverId, csn);
      }
    } catch (UnsupportedEncodingException e)
    {
@@ -121,7 +119,7 @@
  /**
   * Get the length of the next String encoded in the in byte array.
   * This method is used to cut the different parts (server ids, change number)
   * This method is used to cut the different parts (serverIds, CSN)
   * of a server state.
   *
   * @param in the byte array where to calculate the string.
@@ -143,26 +141,25 @@
  }
  /**
   * Update the Server State with a ChangeNumber.
   * Update the Server State with a CSN.
   *
   * @param changeNumber    The committed ChangeNumber.
   *
   * @param csn The committed CSN.
   * @return a boolean indicating if the update was meaningful.
   */
  public boolean update(ChangeNumber changeNumber)
  public boolean update(CSN csn)
  {
    if (changeNumber == null)
    if (csn == null)
      return false;
    saved = false;
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      int serverId = changeNumber.getServerId();
      ChangeNumber oldCN = serverIdToChangeNumber.get(serverId);
      if (oldCN == null || changeNumber.newer(oldCN))
      int serverId = csn.getServerId();
      CSN oldCSN = serverIdToCSN.get(serverId);
      if (oldCSN == null || csn.newer(oldCSN))
      {
        serverIdToChangeNumber.put(serverId, changeNumber);
        serverIdToCSN.put(serverId, csn);
        return true;
      }
      return false;
@@ -170,12 +167,10 @@
  }
  /**
   * Update the Server State with a Server State. Every change number of this
   * object is updated with the change number of the passed server state if
   * it is newer.
   * Update the Server State with a Server State. Every CSN of this object is
   * updated with the CSN of the passed server state if it is newer.
   *
   * @param serverState the server state to use for the update.
   *
   * @return a boolean indicating if the update was meaningful.
   */
  public boolean update(ServerState serverState)
@@ -184,9 +179,9 @@
      return false;
    boolean updated = false;
    for (ChangeNumber cn : serverState.serverIdToChangeNumber.values())
    for (CSN csn : serverState.serverIdToCSN.values())
    {
      if (update(cn))
      if (update(csn))
      {
        updated = true;
      }
@@ -206,7 +201,7 @@
      return false;
    }
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      clear();
      return update(serverState);
@@ -228,9 +223,9 @@
  {
    Set<String> set = new HashSet<String>();
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      for (ChangeNumber change : serverIdToChangeNumber.values())
      for (CSN change : serverIdToCSN.values())
      {
        Date date = new Date(change.getTime());
        set.add(change + " " + date + " " + change.getTime());
@@ -241,20 +236,20 @@
  }
  /**
   * Return an ArrayList of ANS1OctetString encoding the ChangeNumbers
   * Return an ArrayList of ANS1OctetString encoding the CSNs
   * contained in the ServerState.
   * @return an ArrayList of ANS1OctetString encoding the ChangeNumbers
   * @return an ArrayList of ANS1OctetString encoding the CSNs
   * contained in the ServerState.
   */
  public ArrayList<ByteString> toASN1ArrayList()
  {
    ArrayList<ByteString> values = new ArrayList<ByteString>(0);
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      for (ChangeNumber changeNumber : serverIdToChangeNumber.values())
      for (CSN csn : serverIdToCSN.values())
      {
        values.add(ByteString.valueOf(changeNumber.toString()));
        values.add(ByteString.valueOf(csn.toString()));
      }
    }
    return values;
@@ -275,20 +270,20 @@
  public void writeTo(ASN1Writer writer, short protocolVersion)
      throws IOException
  {
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
      {
        for (ChangeNumber cn : serverIdToChangeNumber.values())
        for (CSN csn : serverIdToCSN.values())
        {
          writer.writeOctetString(cn.toByteString());
          writer.writeOctetString(csn.toByteString());
        }
      }
      else
      {
        for (ChangeNumber cn : serverIdToChangeNumber.values())
        for (CSN csn : serverIdToCSN.values())
        {
          writer.writeOctetString(cn.toString());
          writer.writeOctetString(csn.toString());
        }
      }
    }
@@ -303,13 +298,13 @@
  {
    StringBuilder buffer = new StringBuilder();
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      for (ChangeNumber change : serverIdToChangeNumber.values())
      for (CSN change : serverIdToCSN.values())
      {
        buffer.append(change).append(" ");
      }
      if (!serverIdToChangeNumber.isEmpty())
      if (!serverIdToCSN.isEmpty())
        buffer.deleteCharAt(buffer.length() - 1);
    }
@@ -317,39 +312,37 @@
  }
  /**
   * Returns the {@code ChangeNumber} contained in this server state which
   * corresponds to the provided server ID.
   * Returns the {@code CSN} contained in this server state which corresponds to
   * the provided server ID.
   *
   * @param serverId
   *          The server ID.
   * @return The {@code ChangeNumber} contained in this server state which
   * @return The {@code CSN} contained in this server state which
   *         corresponds to the provided server ID.
   */
  public ChangeNumber getChangeNumber(int serverId)
  public CSN getCSN(int serverId)
  {
    return serverIdToChangeNumber.get(serverId);
    return serverIdToCSN.get(serverId);
  }
  /**
   * Returns the largest (most recent) {@code ChangeNumber} in this server
   * state.
   * Returns the largest (most recent) {@code CSN} in this server state.
   *
   * @return The largest (most recent) {@code ChangeNumber} in this server
   *         state.
   * @return The largest (most recent) {@code CSN} in this server state.
   */
  public ChangeNumber getMaxChangeNumber()
  public CSN getMaxCSN()
  {
    ChangeNumber maxCN = null;
    CSN maxCSN = null;
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      for (ChangeNumber tmpMax : serverIdToChangeNumber.values())
      for (CSN csn : serverIdToCSN.values())
      {
        if (maxCN == null || tmpMax.newer(maxCN))
          maxCN = tmpMax;
        if (maxCSN == null || csn.newer(maxCSN))
          maxCSN = csn;
      }
    }
    return maxCN;
    return maxCSN;
  }
  /**
@@ -373,14 +366,14 @@
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  {
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      final int size = serverIdToChangeNumber.size();
      final int size = serverIdToCSN.size();
      List<String> idList = new ArrayList<String>(size);
      List<String> cnList = new ArrayList<String>(size);
      List<String> csnList = new ArrayList<String>(size);
      // calculate the total length needed to allocate byte array
      int length = 0;
      for (Entry<Integer, ChangeNumber> entry : serverIdToChangeNumber
      for (Entry<Integer, CSN> entry : serverIdToCSN
          .entrySet())
      {
        // serverId is useless, see comment in ServerState ctor
@@ -388,9 +381,9 @@
        idList.add(serverIdStr);
        length += serverIdStr.length() + 1;
        String changeNumberStr = entry.getValue().toString();
        cnList.add(changeNumberStr);
        length += changeNumberStr.length() + 1;
        String csnStr = entry.getValue().toString();
        csnList.add(csnStr);
        length += csnStr.length() + 1;
      }
      byte[] result = new byte[length];
@@ -400,7 +393,7 @@
      {
        String str = idList.get(i);
        pos = addByteArray(str.getBytes("UTF-8"), result, pos);
        str = cnList.get(i);
        str = csnList.get(i);
        pos = addByteArray(str.getBytes("UTF-8"), result, pos);
      }
      return result;
@@ -413,12 +406,12 @@
  @Override
  public Iterator<Integer> iterator()
  {
    return serverIdToChangeNumber.keySet().iterator();
    return serverIdToCSN.keySet().iterator();
  }
  /**
   * Check that all the ChangeNumbers in the covered serverState are also in
   * this serverState.
   * Check that all the CSNs in the covered serverState are also in this
   * serverState.
   *
   * @param covered The ServerState that needs to be checked.
   * @return A boolean indicating if this ServerState covers the ServerState
@@ -426,7 +419,7 @@
   */
  public boolean cover(ServerState covered)
  {
    for (ChangeNumber coveredChange : covered.serverIdToChangeNumber.values())
    for (CSN coveredChange : covered.serverIdToCSN.values())
    {
      if (!cover(coveredChange))
      {
@@ -437,16 +430,16 @@
  }
  /**
   * Checks that the ChangeNumber given as a parameter is in this ServerState.
   * Checks that the CSN given as a parameter is in this ServerState.
   *
   * @param   covered The ChangeNumber that should be checked.
   * @return  A boolean indicating if this ServerState contains the ChangeNumber
   *          given in parameter.
   * @param   covered The CSN that should be checked.
   * @return  A boolean indicating if this ServerState contains the CSN given in
   *          parameter.
   */
  public boolean cover(ChangeNumber covered)
  public boolean cover(CSN covered)
  {
    ChangeNumber change =
        this.serverIdToChangeNumber.get(covered.getServerId());
    CSN change =
        this.serverIdToCSN.get(covered.getServerId());
    return change != null && !change.older(covered);
  }
@@ -457,7 +450,7 @@
   */
  public boolean isEmpty()
  {
    return serverIdToChangeNumber.isEmpty();
    return serverIdToCSN.isEmpty();
  }
  /**
@@ -467,9 +460,9 @@
  public ServerState duplicate()
  {
    ServerState newState = new ServerState();
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      newState.serverIdToChangeNumber.putAll(serverIdToChangeNumber);
      newState.serverIdToCSN.putAll(serverIdToCSN);
    }
    return newState;
  }
@@ -492,21 +485,23 @@
    }
    int diff = 0;
    for (Integer serverId : ss1.serverIdToChangeNumber.keySet())
    for (Integer serverId : ss1.serverIdToCSN.keySet())
    {
      ChangeNumber cn1 = ss1.serverIdToChangeNumber.get(serverId);
      if (cn1 != null)
      CSN csn1 = ss1.serverIdToCSN.get(serverId);
      if (csn1 != null)
      {
        ChangeNumber cn2 = ss2.serverIdToChangeNumber.get(serverId);
         if (cn2 != null)
         {
           diff += ChangeNumber.diffSeqNum(cn1, cn2);
         } else {
           // ss2 does not have a change for this server id but ss1, so the
           // server holding ss1 has every changes represented in cn1 in advance
           // compared to server holding ss2, add this amount
           diff += cn1.getSeqnum();
         }
        CSN csn2 = ss2.serverIdToCSN.get(serverId);
        if (csn2 != null)
        {
          diff += CSN.diffSeqNum(csn1, csn2);
        }
        else
        {
          // ss2 does not have a change for this server id but ss1, so the
          // server holding ss1 has every changes represented in csn1 in advance
          // compared to server holding ss2, add this amount
          diff += csn1.getSeqnum();
        }
      }
    }
@@ -534,23 +529,23 @@
  }
  /**
   * Build a copy of the ServerState with only ChangeNumbers older than
   * a specific ChangeNumber. This is used when building the initial
   * Build a copy of the ServerState with only CSNs older than
   * a specific CSN. This is used when building the initial
   * Cookie in the External Changelog, to cope with purged changes.
   * @param cn The ChangeNumber to compare the ServerState with
   * @return a copy of the ServerState which only contains the ChangeNumbers
   *         older than cn.
   * @param csn The CSN to compare the ServerState with
   * @return a copy of the ServerState which only contains the CSNs older than
   *         csn.
   */
  public ServerState duplicateOnlyOlderThan(ChangeNumber cn)
  public ServerState duplicateOnlyOlderThan(CSN csn)
  {
    ServerState newState = new ServerState();
    synchronized (serverIdToChangeNumber)
    synchronized (serverIdToCSN)
    {
      for (ChangeNumber change : serverIdToChangeNumber.values())
      for (CSN change : serverIdToCSN.values())
      {
        if (change.older(cn))
        if (change.older(csn))
        {
          newState.serverIdToChangeNumber.put(change.getServerId(), change);
          newState.serverIdToCSN.put(change.getServerId(), change);
        }
      }
    }