| | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.protocols.asn1.ASN1Writer; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | /** |
| | | * This class is used to associate serverIds with {@link CSN}s. |
| | |
| | | { |
| | | |
| | | /** Associates a serverId with a CSN. */ |
| | | private final Map<Integer, CSN> serverIdToCSN = new HashMap<Integer, CSN>(); |
| | | private final ConcurrentMap<Integer, CSN> serverIdToCSN = |
| | | new ConcurrentSkipListMap<Integer, CSN>(); |
| | | /** |
| | | * Whether the state has been saved to persistent storage. It starts at true, |
| | | * and moves to false when an update is made to the current object. |
| | |
| | | */ |
| | | public void clear() |
| | | { |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | serverIdToCSN.clear(); |
| | | } |
| | | serverIdToCSN.clear(); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | saved = false; |
| | | |
| | | synchronized (serverIdToCSN) |
| | | final int serverId = csn.getServerId(); |
| | | while (true) |
| | | { |
| | | final int serverId = csn.getServerId(); |
| | | final CSN existingCSN = serverIdToCSN.get(serverId); |
| | | if (existingCSN == null || csn.isNewerThan(existingCSN)) |
| | | if (existingCSN == null) |
| | | { |
| | | serverIdToCSN.put(serverId, csn); |
| | | return true; |
| | | if (serverIdToCSN.putIfAbsent(serverId, csn) == null) |
| | | { |
| | | return true; |
| | | } |
| | | // oops, a concurrent modification happened, run the same process again |
| | | continue; |
| | | } |
| | | else if (csn.isNewerThan(existingCSN)) |
| | | { |
| | | if (serverIdToCSN.replace(serverId, existingCSN, csn)) |
| | | { |
| | | return true; |
| | | } |
| | | // oops, a concurrent modification happened, run the same process again |
| | | continue; |
| | | } |
| | | return false; |
| | | } |
| | |
| | | if (expectedCSN == null) |
| | | return false; |
| | | |
| | | synchronized (serverIdToCSN) |
| | | if (serverIdToCSN.remove(expectedCSN.getServerId(), expectedCSN)) |
| | | { |
| | | for (Iterator<CSN> iter = serverIdToCSN.values().iterator(); |
| | | iter.hasNext();) |
| | | { |
| | | final CSN csn = iter.next(); |
| | | if (expectedCSN.equals(csn)) |
| | | { |
| | | iter.remove(); |
| | | saved = false; |
| | | return true; |
| | | } |
| | | } |
| | | saved = false; |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | |
| | | return false; |
| | | } |
| | | |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | clear(); |
| | | return update(serverState); |
| | | } |
| | | clear(); |
| | | return update(serverState); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Set<String> toStringSet() |
| | | { |
| | | Set<String> set = new HashSet<String>(); |
| | | |
| | | synchronized (serverIdToCSN) |
| | | final Set<String> result = new HashSet<String>(); |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | Date date = new Date(change.getTime()); |
| | | set.add(change + " " + date + " " + change.getTime()); |
| | | } |
| | | Date date = new Date(change.getTime()); |
| | | result.add(change + " " + date + " " + change.getTime()); |
| | | } |
| | | |
| | | return set; |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public ArrayList<ByteString> toASN1ArrayList() |
| | | { |
| | | ArrayList<ByteString> values = new ArrayList<ByteString>(0); |
| | | |
| | | synchronized (serverIdToCSN) |
| | | final ArrayList<ByteString> values = new ArrayList<ByteString>(0); |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | values.add(ByteString.valueOf(csn.toString())); |
| | | } |
| | | values.add(ByteString.valueOf(csn.toString())); |
| | | } |
| | | return values; |
| | | } |
| | |
| | | public void writeTo(ASN1Writer writer, short protocolVersion) |
| | | throws IOException |
| | | { |
| | | synchronized (serverIdToCSN) |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | { |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | writer.writeOctetString(csn.toByteString()); |
| | | } |
| | | writer.writeOctetString(csn.toByteString()); |
| | | } |
| | | else |
| | | } |
| | | else |
| | | { |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | writer.writeOctetString(csn.toString()); |
| | | } |
| | | writer.writeOctetString(csn.toString()); |
| | | } |
| | | } |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | StringBuilder buffer = new StringBuilder(); |
| | | |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | buffer.append(change).append(" "); |
| | | } |
| | | if (!serverIdToCSN.isEmpty()) |
| | | buffer.deleteCharAt(buffer.length() - 1); |
| | | } |
| | | |
| | | return buffer.toString(); |
| | | return StaticUtils.collectionToString(serverIdToCSN.values(), " "); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the largest (most recent) {@code CSN} in this server state. |
| | | * |
| | | * @return The largest (most recent) {@code CSN} in this server state. |
| | | */ |
| | | public CSN getMaxCSN() |
| | | { |
| | | CSN maxCSN = null; |
| | | |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | for (CSN csn : serverIdToCSN.values()) |
| | | { |
| | | if (maxCSN == null || csn.isNewerThan(maxCSN)) |
| | | maxCSN = csn; |
| | | } |
| | | } |
| | | return maxCSN; |
| | | } |
| | | |
| | | /** |
| | | * Add the tail into resultByteArray at position pos. |
| | | */ |
| | | private int addByteArray(byte[] tail, byte[] resultByteArray, int pos) |
| | |
| | | */ |
| | | public byte[] getBytes() throws UnsupportedEncodingException |
| | | { |
| | | synchronized (serverIdToCSN) |
| | | // copy to protect from concurrent updates |
| | | // that could change the number of elements in the Map |
| | | final Map<Integer, CSN> copy = new HashMap<Integer, CSN>(serverIdToCSN); |
| | | |
| | | final int size = copy.size(); |
| | | List<String> idList = new ArrayList<String>(size); |
| | | List<String> csnList = new ArrayList<String>(size); |
| | | // calculate the total length needed to allocate byte array |
| | | int length = 0; |
| | | for (Entry<Integer, CSN> entry : copy.entrySet()) |
| | | { |
| | | final int size = serverIdToCSN.size(); |
| | | List<String> idList = new ArrayList<String>(size); |
| | | List<String> csnList = new ArrayList<String>(size); |
| | | // calculate the total length needed to allocate byte array |
| | | int length = 0; |
| | | for (Entry<Integer, CSN> entry : serverIdToCSN.entrySet()) |
| | | { |
| | | // serverId is useless, see comment in ServerState ctor |
| | | final String serverIdStr = String.valueOf(entry.getKey()); |
| | | idList.add(serverIdStr); |
| | | length += serverIdStr.length() + 1; |
| | | // serverId is useless, see comment in ServerState ctor |
| | | final String serverIdStr = String.valueOf(entry.getKey()); |
| | | idList.add(serverIdStr); |
| | | length += serverIdStr.length() + 1; |
| | | |
| | | final String csnStr = entry.getValue().toString(); |
| | | csnList.add(csnStr); |
| | | length += csnStr.length() + 1; |
| | | } |
| | | byte[] result = new byte[length]; |
| | | |
| | | // write the server state into the byte array |
| | | int pos = 0; |
| | | for (int i = 0; i < size; i++) |
| | | { |
| | | String str = idList.get(i); |
| | | pos = addByteArray(str.getBytes("UTF-8"), result, pos); |
| | | str = csnList.get(i); |
| | | pos = addByteArray(str.getBytes("UTF-8"), result, pos); |
| | | } |
| | | return result; |
| | | final String csnStr = entry.getValue().toString(); |
| | | csnList.add(csnStr); |
| | | length += csnStr.length() + 1; |
| | | } |
| | | byte[] result = new byte[length]; |
| | | |
| | | // write the server state into the byte array |
| | | int pos = 0; |
| | | for (int i = 0; i < size; i++) |
| | | { |
| | | String str = idList.get(i); |
| | | pos = addByteArray(str.getBytes("UTF-8"), result, pos); |
| | | str = csnList.get(i); |
| | | pos = addByteArray(str.getBytes("UTF-8"), result, pos); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public ServerState duplicate() |
| | | { |
| | | ServerState newState = new ServerState(); |
| | | synchronized (serverIdToCSN) |
| | | { |
| | | newState.serverIdToCSN.putAll(serverIdToCSN); |
| | | } |
| | | final ServerState newState = new ServerState(); |
| | | newState.serverIdToCSN.putAll(serverIdToCSN); |
| | | return newState; |
| | | } |
| | | |
| | |
| | | { |
| | | final CSN csn = new CSN(timestamp, 0, 0); |
| | | final ServerState newState = new ServerState(); |
| | | synchronized (serverIdToCSN) |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | for (CSN change : serverIdToCSN.values()) |
| | | if (change.isOlderThan(csn)) |
| | | { |
| | | if (change.isOlderThan(csn)) |
| | | { |
| | | newState.serverIdToCSN.put(change.getServerId(), change); |
| | | } |
| | | newState.serverIdToCSN.put(change.getServerId(), change); |
| | | } |
| | | } |
| | | return newState; |