| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2012-2013 ForgeRock AS. |
| | | * Portions copyright 2012-2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.util.NavigableMap; |
| | | import java.util.TreeMap; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | |
| | |
| | | /** |
| | | * This class is used to build ordered lists of UpdateMsg. |
| | | * The order is defined by the order of the CSN of the UpdateMsg. |
| | | * @ThreadSafe |
| | | */ |
| | | public class MsgQueue |
| | | { |
| | | private NavigableMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>(); |
| | | private TreeMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>(); |
| | | /** |
| | | * FIXME JNR to be investigated: |
| | | * I strongly suspect that we could replace this field |
| | | * by using the synchronized keyword on each method. |
| | | * However, MessageHandler is weirdly synchronizing on msgQueue field |
| | | * even though it is touching the lateQueue field (?!?). |
| | | */ |
| | | private final Object lock = new Object(); |
| | | |
| | | /** The total number of bytes for all the message in the queue. */ |
| | |
| | | { |
| | | synchronized (lock) |
| | | { |
| | | UpdateMsg msgSameCSN = map.put(update.getCSN(), update); |
| | | final UpdateMsg msgSameCSN = map.put(update.getCSN(), update); |
| | | if (msgSameCSN != null) |
| | | { |
| | | try |
| | |
| | | { |
| | | // Adding 2 msgs with the same CSN is ok only when |
| | | // the 2 msgs are the same |
| | | bytesCount += (update.size() - msgSameCSN.size()); |
| | | Message errMsg = ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CN.get( |
| | | bytesCount += update.size() - msgSameCSN.size(); |
| | | logError(ERR_RSQUEUE_DIFFERENT_MSGS_WITH_SAME_CN.get( |
| | | msgSameCSN.getCSN().toString(), |
| | | msgSameCSN.toString(), update.toString()); |
| | | logError(errMsg); |
| | | msgSameCSN.toString(), update.toString())); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | catch (Exception e) |
| | | {} |
| | | } |
| | | else |
| | |
| | | { |
| | | synchronized (lock) |
| | | { |
| | | UpdateMsg update = map.get(map.firstKey()); |
| | | // FIXME JNR replace next 2 lines with just that one: |
| | | // final UpdateMsg update = map.pollFirstEntry().getValue(); |
| | | final UpdateMsg update = map.get(map.firstKey()); |
| | | map.remove(update.getCSN()); |
| | | bytesCount -= update.size(); |
| | | if ((map.size() == 0) && (bytesCount != 0)) |
| | | if (map.isEmpty() && bytesCount != 0) |
| | | { |
| | | // should never happen |
| | | Message msg = ERR_BYTE_COUNT.get(Integer.toString(bytesCount)); |
| | | logError(msg); |
| | | logError(ERR_BYTE_COUNT.get(Integer.toString(bytesCount))); |
| | | bytesCount = 0; |
| | | } |
| | | return update; |
| | |
| | | * message. If the passed in message is not contained in the current queue, |
| | | * then all messages will be removed from it. |
| | | * |
| | | * @param msg |
| | | * @param finalMsg |
| | | * the final message to reach when consuming messages from this queue |
| | | */ |
| | | public void consumeUpTo(UpdateMsg msg) |
| | | public void consumeUpTo(UpdateMsg finalMsg) |
| | | { |
| | | UpdateMsg msg1; |
| | | // FIXME this code could be more efficient if the msgQueue could call the |
| | | // following code (to be tested): |
| | | // if (!map.containsKey(finalMsg.getCSN())) { |
| | | // map.clear(); |
| | | // } else { |
| | | // map.headMap(finalMsg.getCSN(), true).clear(); |
| | | // } |
| | | |
| | | final CSN finalCSN = finalMsg.getCSN(); |
| | | UpdateMsg msg; |
| | | do |
| | | { |
| | | // FIXME this code could be more efficient if the msgQueue could call the |
| | | // following code (to be tested): |
| | | // map.headMap(msg.getCSN(), true).clear() |
| | | msg1 = removeFirst(); |
| | | } while (!msg.getCSN().equals(msg1.getCSN())); |
| | | msg = removeFirst(); |
| | | } |
| | | while (!finalCSN.equals(msg.getCSN())); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " bytesCount=" + bytesCount + " queue=" |
| | | + map.values(); |
| | | } |
| | | } |