| | |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.messages.Message; |
| | |
| | | * Get the next update that must be sent to the consumer |
| | | * from the message queue or from the database. |
| | | * |
| | | * @param synchronous specifies what to do when the queue is empty. |
| | | * @param connectedReplicaIds |
| | | * Ids of replicas to accept when returning a message from |
| | | * the late queue. |
| | | * @param synchronous specifies what to do when the queue is empty. |
| | | * when true, the method blocks; when false the method return null. |
| | | * |
| | | * @return The next update that must be sent to the consumer. |
| | | * null when synchronous is false and queue is empty. |
| | | */ |
| | | protected UpdateMsg getNextMessage(boolean synchronous) |
| | | protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous) |
| | | { |
| | | while (activeConsumer) |
| | | { |
| | |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | */ |
| | | fillLateQueue(); |
| | | fillLateQueue(connectedReplicaIds); |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | // we could not find any messages in the changelog |
| | |
| | | return null; |
| | | } |
| | | |
| | | private void fillLateQueue() |
| | | /** |
| | | * Fills the late queue with the most recent changes, accepting only the |
| | | * messages from provided replica ids. |
| | | */ |
| | | private void fillLateQueue(Set<Integer> connectedReplicaIds) |
| | | { |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | |
| | | cursor = replicationServerDomain.getCursorFrom(serverState); |
| | | while (cursor.next() && isLateQueueBelowThreshold()) |
| | | { |
| | | lateQueue.add(cursor.getRecord()); |
| | | final UpdateMsg record = cursor.getRecord(); |
| | | if (connectedReplicaIds.contains(record.getCSN().getServerId())) |
| | | { |
| | | lateQueue.add(record); |
| | | } |
| | | } |
| | | } |
| | | catch (ChangelogException e) |