| | |
| | | import org.opends.server.types.ByteString; |
| | | |
| | | /** |
| | | * This class is used to associate serverIds with ChangeNumbers. |
| | | * This class is used to associate serverIds with {@link CSN}s. |
| | | * <p> |
| | | * For example, it is exchanged with the replication servers at connection |
| | | * establishment time to communicate |
| | | * "which ChangeNumbers last seen by a serverId" |
| | | * establishment time to communicate "which CSNs was last seen by a serverId". |
| | | */ |
| | | public class ServerState implements Iterable<Integer> |
| | | { |
| | | |
| | | /** Associates a serverId with a ChangeNumber. */ |
| | | private final Map<Integer, ChangeNumber> serverIdToChangeNumber = |
| | | new HashMap<Integer, ChangeNumber>(); |
| | | /** Associates a serverId with a CSN. */ |
| | | private final Map<Integer, CSN> serverIdToCSN = new HashMap<Integer, CSN>(); |
| | | /** |
| | | * Whether the state has been saved to persistent storage. It starts at true, |
| | | * and moves to false when an update is made to the current object. |
| | |
| | | */ |
| | | public void clear() |
| | | { |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | serverIdToChangeNumber.clear(); |
| | | serverIdToCSN.clear(); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | while (endpos > pos) |
| | | { |
| | | // FIXME JNR: why store the serverId separately from the changeNumber |
| | | // since the changeNumber already contains the serverId? |
| | | // FIXME JNR: why store the serverId separately from the CSN since the |
| | | // CSN already contains the serverId? |
| | | |
| | | // read the ServerId |
| | | int length = getNextLength(in, pos); |
| | |
| | | int serverId = Integer.valueOf(serverIdString); |
| | | pos += length +1; |
| | | |
| | | // read the ChangeNumber |
| | | // read the CSN |
| | | length = getNextLength(in, pos); |
| | | String cnString = new String(in, pos, length, "UTF-8"); |
| | | ChangeNumber cn = new ChangeNumber(cnString); |
| | | String csnString = new String(in, pos, length, "UTF-8"); |
| | | CSN csn = new CSN(csnString); |
| | | pos += length +1; |
| | | |
| | | // Add the serverId |
| | | serverIdToChangeNumber.put(serverId, cn); |
| | | serverIdToCSN.put(serverId, csn); |
| | | } |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | |
| | | /** |
| | | * Get the length of the next String encoded in the in byte array. |
| | | * This method is used to cut the different parts (server ids, change number) |
| | | * This method is used to cut the different parts (serverIds, CSN) |
| | | * of a server state. |
| | | * |
| | | * @param in the byte array where to calculate the string. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Update the Server State with a ChangeNumber. |
| | | * Update the Server State with a CSN. |
| | | * |
| | | * @param changeNumber The committed ChangeNumber. |
| | | * |
| | | * @param csn The committed CSN. |
| | | * @return a boolean indicating if the update was meaningful. |
| | | */ |
| | | public boolean update(ChangeNumber changeNumber) |
| | | public boolean update(CSN csn) |
| | | { |
| | | if (changeNumber == null) |
| | | if (csn == null) |
| | | return false; |
| | | |
| | | saved = false; |
| | | |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | int serverId = changeNumber.getServerId(); |
| | | ChangeNumber oldCN = serverIdToChangeNumber.get(serverId); |
| | | if (oldCN == null || changeNumber.newer(oldCN)) |
| | | int serverId = csn.getServerId(); |
| | | CSN oldCSN = serverIdToCSN.get(serverId); |
| | | if (oldCSN == null || csn.newer(oldCSN)) |
| | | { |
| | | serverIdToChangeNumber.put(serverId, changeNumber); |
| | | serverIdToCSN.put(serverId, csn); |
| | | return true; |
| | | } |
| | | return false; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Update the Server State with a Server State. Every change number of this |
| | | * object is updated with the change number of the passed server state if |
| | | * it is newer. |
| | | * Update the Server State with a Server State. Every CSN of this object is |
| | | * updated with the CSN of the passed server state if it is newer. |
| | | * |
| | | * @param serverState the server state to use for the update. |
| | | * |
| | | * @return a boolean indicating if the update was meaningful. |
| | | */ |
| | | public boolean update(ServerState serverState) |
| | |
| | | return false; |
| | | |
| | | boolean updated = false; |
| | | for (ChangeNumber cn : serverState.serverIdToChangeNumber.values()) |
| | | for (CSN csn : serverState.serverIdToCSN.values()) |
| | | { |
| | | if (update(cn)) |
| | | if (update(csn)) |
| | | { |
| | | updated = true; |
| | | } |
| | |
| | | return false; |
| | | } |
| | | |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | clear(); |
| | | return update(serverState); |
| | |
| | | { |
| | | Set<String> set = new HashSet<String>(); |
| | | |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | for (ChangeNumber change : serverIdToChangeNumber.values()) |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | Date date = new Date(change.getTime()); |
| | | set.add(change + " " + date + " " + change.getTime()); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return an ArrayList of ANS1OctetString encoding the ChangeNumbers |
| | | * Return an ArrayList of ANS1OctetString encoding the CSNs |
| | | * contained in the ServerState. |
| | | * @return an ArrayList of ANS1OctetString encoding the ChangeNumbers |
| | | * @return an ArrayList of ANS1OctetString encoding the CSNs |
| | | * contained in the ServerState. |
| | | */ |
| | | public ArrayList<ByteString> toASN1ArrayList() |
| | | { |
| | | ArrayList<ByteString> values = new ArrayList<ByteString>(0); |
| | | |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | for (ChangeNumber changeNumber : serverIdToChangeNumber.values()) |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | values.add(ByteString.valueOf(changeNumber.toString())); |
| | | values.add(ByteString.valueOf(csn.toString())); |
| | | } |
| | | } |
| | | return values; |
| | |
| | | public void writeTo(ASN1Writer writer, short protocolVersion) |
| | | throws IOException |
| | | { |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | { |
| | | for (ChangeNumber cn : serverIdToChangeNumber.values()) |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | writer.writeOctetString(cn.toByteString()); |
| | | writer.writeOctetString(csn.toByteString()); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | for (ChangeNumber cn : serverIdToChangeNumber.values()) |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | writer.writeOctetString(cn.toString()); |
| | | writer.writeOctetString(csn.toString()); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | | |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | for (ChangeNumber change : serverIdToChangeNumber.values()) |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | buffer.append(change).append(" "); |
| | | } |
| | | if (!serverIdToChangeNumber.isEmpty()) |
| | | if (!serverIdToCSN.isEmpty()) |
| | | buffer.deleteCharAt(buffer.length() - 1); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the {@code ChangeNumber} contained in this server state which |
| | | * corresponds to the provided server ID. |
| | | * Returns the {@code CSN} contained in this server state which corresponds to |
| | | * the provided server ID. |
| | | * |
| | | * @param serverId |
| | | * The server ID. |
| | | * @return The {@code ChangeNumber} contained in this server state which |
| | | * @return The {@code CSN} contained in this server state which |
| | | * corresponds to the provided server ID. |
| | | */ |
| | | public ChangeNumber getChangeNumber(int serverId) |
| | | public CSN getCSN(int serverId) |
| | | { |
| | | return serverIdToChangeNumber.get(serverId); |
| | | return serverIdToCSN.get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Returns the largest (most recent) {@code ChangeNumber} in this server |
| | | * state. |
| | | * Returns the largest (most recent) {@code CSN} in this server state. |
| | | * |
| | | * @return The largest (most recent) {@code ChangeNumber} in this server |
| | | * state. |
| | | * @return The largest (most recent) {@code CSN} in this server state. |
| | | */ |
| | | public ChangeNumber getMaxChangeNumber() |
| | | public CSN getMaxCSN() |
| | | { |
| | | ChangeNumber maxCN = null; |
| | | CSN maxCSN = null; |
| | | |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | for (ChangeNumber tmpMax : serverIdToChangeNumber.values()) |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | if (maxCN == null || tmpMax.newer(maxCN)) |
| | | maxCN = tmpMax; |
| | | if (maxCSN == null || csn.newer(maxCSN)) |
| | | maxCSN = csn; |
| | | } |
| | | } |
| | | return maxCN; |
| | | return maxCSN; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public byte[] getBytes() throws UnsupportedEncodingException |
| | | { |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | final int size = serverIdToChangeNumber.size(); |
| | | final int size = serverIdToCSN.size(); |
| | | List<String> idList = new ArrayList<String>(size); |
| | | List<String> cnList = new ArrayList<String>(size); |
| | | List<String> csnList = new ArrayList<String>(size); |
| | | // calculate the total length needed to allocate byte array |
| | | int length = 0; |
| | | for (Entry<Integer, ChangeNumber> entry : serverIdToChangeNumber |
| | | for (Entry<Integer, CSN> entry : serverIdToCSN |
| | | .entrySet()) |
| | | { |
| | | // serverId is useless, see comment in ServerState ctor |
| | |
| | | idList.add(serverIdStr); |
| | | length += serverIdStr.length() + 1; |
| | | |
| | | String changeNumberStr = entry.getValue().toString(); |
| | | cnList.add(changeNumberStr); |
| | | length += changeNumberStr.length() + 1; |
| | | String csnStr = entry.getValue().toString(); |
| | | csnList.add(csnStr); |
| | | length += csnStr.length() + 1; |
| | | } |
| | | byte[] result = new byte[length]; |
| | | |
| | |
| | | { |
| | | String str = idList.get(i); |
| | | pos = addByteArray(str.getBytes("UTF-8"), result, pos); |
| | | str = cnList.get(i); |
| | | str = csnList.get(i); |
| | | pos = addByteArray(str.getBytes("UTF-8"), result, pos); |
| | | } |
| | | return result; |
| | |
| | | @Override |
| | | public Iterator<Integer> iterator() |
| | | { |
| | | return serverIdToChangeNumber.keySet().iterator(); |
| | | return serverIdToCSN.keySet().iterator(); |
| | | } |
| | | |
| | | /** |
| | | * Check that all the ChangeNumbers in the covered serverState are also in |
| | | * this serverState. |
| | | * Check that all the CSNs in the covered serverState are also in this |
| | | * serverState. |
| | | * |
| | | * @param covered The ServerState that needs to be checked. |
| | | * @return A boolean indicating if this ServerState covers the ServerState |
| | |
| | | */ |
| | | public boolean cover(ServerState covered) |
| | | { |
| | | for (ChangeNumber coveredChange : covered.serverIdToChangeNumber.values()) |
| | | for (CSN coveredChange : covered.serverIdToCSN.values()) |
| | | { |
| | | if (!cover(coveredChange)) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Checks that the ChangeNumber given as a parameter is in this ServerState. |
| | | * Checks that the CSN given as a parameter is in this ServerState. |
| | | * |
| | | * @param covered The ChangeNumber that should be checked. |
| | | * @return A boolean indicating if this ServerState contains the ChangeNumber |
| | | * given in parameter. |
| | | * @param covered The CSN that should be checked. |
| | | * @return A boolean indicating if this ServerState contains the CSN given in |
| | | * parameter. |
| | | */ |
| | | public boolean cover(ChangeNumber covered) |
| | | public boolean cover(CSN covered) |
| | | { |
| | | ChangeNumber change = |
| | | this.serverIdToChangeNumber.get(covered.getServerId()); |
| | | CSN change = |
| | | this.serverIdToCSN.get(covered.getServerId()); |
| | | return change != null && !change.older(covered); |
| | | } |
| | | |
| | |
| | | */ |
| | | public boolean isEmpty() |
| | | { |
| | | return serverIdToChangeNumber.isEmpty(); |
| | | return serverIdToCSN.isEmpty(); |
| | | } |
| | | |
| | | /** |
| | |
| | | public ServerState duplicate() |
| | | { |
| | | ServerState newState = new ServerState(); |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | newState.serverIdToChangeNumber.putAll(serverIdToChangeNumber); |
| | | newState.serverIdToCSN.putAll(serverIdToCSN); |
| | | } |
| | | return newState; |
| | | } |
| | |
| | | } |
| | | |
| | | int diff = 0; |
| | | for (Integer serverId : ss1.serverIdToChangeNumber.keySet()) |
| | | for (Integer serverId : ss1.serverIdToCSN.keySet()) |
| | | { |
| | | ChangeNumber cn1 = ss1.serverIdToChangeNumber.get(serverId); |
| | | if (cn1 != null) |
| | | CSN csn1 = ss1.serverIdToCSN.get(serverId); |
| | | if (csn1 != null) |
| | | { |
| | | ChangeNumber cn2 = ss2.serverIdToChangeNumber.get(serverId); |
| | | if (cn2 != null) |
| | | { |
| | | diff += ChangeNumber.diffSeqNum(cn1, cn2); |
| | | } else { |
| | | // ss2 does not have a change for this server id but ss1, so the |
| | | // server holding ss1 has every changes represented in cn1 in advance |
| | | // compared to server holding ss2, add this amount |
| | | diff += cn1.getSeqnum(); |
| | | } |
| | | CSN csn2 = ss2.serverIdToCSN.get(serverId); |
| | | if (csn2 != null) |
| | | { |
| | | diff += CSN.diffSeqNum(csn1, csn2); |
| | | } |
| | | else |
| | | { |
| | | // ss2 does not have a change for this server id but ss1, so the |
| | | // server holding ss1 has every changes represented in csn1 in advance |
| | | // compared to server holding ss2, add this amount |
| | | diff += csn1.getSeqnum(); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Build a copy of the ServerState with only ChangeNumbers older than |
| | | * a specific ChangeNumber. This is used when building the initial |
| | | * Build a copy of the ServerState with only CSNs older than |
| | | * a specific CSN. This is used when building the initial |
| | | * Cookie in the External Changelog, to cope with purged changes. |
| | | * @param cn The ChangeNumber to compare the ServerState with |
| | | * @return a copy of the ServerState which only contains the ChangeNumbers |
| | | * older than cn. |
| | | * @param csn The CSN to compare the ServerState with |
| | | * @return a copy of the ServerState which only contains the CSNs older than |
| | | * csn. |
| | | */ |
| | | public ServerState duplicateOnlyOlderThan(ChangeNumber cn) |
| | | public ServerState duplicateOnlyOlderThan(CSN csn) |
| | | { |
| | | ServerState newState = new ServerState(); |
| | | synchronized (serverIdToChangeNumber) |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | for (ChangeNumber change : serverIdToChangeNumber.values()) |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | if (change.older(cn)) |
| | | if (change.older(csn)) |
| | | { |
| | | newState.serverIdToChangeNumber.put(change.getServerId(), change); |
| | | newState.serverIdToCSN.put(change.getServerId(), change); |
| | | } |
| | | } |
| | | } |