| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | |
| | | /** |
| | | * This class is used to build ordered lists of UpdateMsg. |
| | | * The order is defined by the order of the ChangeNumber of the UpdateMsg. |
| | | * The order is defined by the order of the CSN of the UpdateMsg. |
| | | */ |
| | | |
| | | public class MsgQueue |
| | | { |
| | | private SortedMap<ChangeNumber, UpdateMsg> map = |
| | | new TreeMap<ChangeNumber, UpdateMsg>(); |
| | | private SortedMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>(); |
| | | private final Object lock = new Object(); |
| | | |
| | | // The total number of bytes for all the message in the queue. |
| | | /** The total number of bytes for all the message in the queue. */ |
| | | private int bytesCount = 0; |
| | | |
| | | /** |
| | |
| | | { |
| | | synchronized (lock) |
| | | { |
| | | UpdateMsg msgSameChangeNumber = map.put(update.getChangeNumber(), update); |
| | | if (msgSameChangeNumber != null) |
| | | UpdateMsg msgSameCSN = map.put(update.getCSN(), update); |
| | | if (msgSameCSN != null) |
| | | { |
| | | try |
| | | { |
| | | if (msgSameChangeNumber.getBytes().length != update.getBytes().length |
| | | || msgSameChangeNumber.isAssured() != update.isAssured() |
| | | || msgSameChangeNumber.getVersion() != update.getVersion()) |
| | | if (msgSameCSN.getBytes().length != update.getBytes().length |
| | | || msgSameCSN.isAssured() != update.isAssured() |
| | | || msgSameCSN.getVersion() != update.getVersion()) |
| | | { |
| | | // Adding 2 msgs with the same ChangeNumber is ok only when |
| | | // Adding 2 msgs with the same CSN is ok only when |
| | | // the 2 msgs are the same |
| | | bytesCount += (update.size() - msgSameChangeNumber.size()); |
| | | bytesCount += (update.size() - msgSameCSN.size()); |
| | | Message errMsg = ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CN.get( |
| | | msgSameChangeNumber.getChangeNumber().toString(), |
| | | msgSameChangeNumber.toString(), update.toString()); |
| | | msgSameCSN.getCSN().toString(), |
| | | msgSameCSN.toString(), update.toString()); |
| | | logError(errMsg); |
| | | } |
| | | } |
| | |
| | | synchronized (lock) |
| | | { |
| | | UpdateMsg update = map.get(map.firstKey()); |
| | | map.remove(update.getChangeNumber()); |
| | | map.remove(update.getCSN()); |
| | | bytesCount -= update.size(); |
| | | if ((map.size() == 0) && (bytesCount != 0)) |
| | | { |
| | |
| | | |
| | | /** |
| | | * Returns <tt>true</tt> if this map contains an UpdateMsg |
| | | * with the same ChangeNumber as the given UpdateMsg. |
| | | * with the same CSN as the given UpdateMsg. |
| | | * |
| | | * @param msg UpdateMsg whose presence in this queue is to be tested. |
| | | * |
| | | * @return <tt>true</tt> if this map contains an UpdateMsg |
| | | * with the same ChangeNumber as the given UpdateMsg. |
| | | * with the same CSN as the given UpdateMsg. |
| | | * |
| | | */ |
| | | public boolean contains(UpdateMsg msg) |
| | | { |
| | | synchronized (lock) |
| | | { |
| | | return map.containsKey(msg.getChangeNumber()); |
| | | return map.containsKey(msg.getCSN()); |
| | | } |
| | | } |
| | | |