| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.util.*; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.messages.Message; |
| | |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | */ |
| | | NavigableSet<ReplicaDBCursor> sortedCursors = null; |
| | | ReplicaDBCursor cursor = null; |
| | | try |
| | | { |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | | |
| | | // fill the lateQueue |
| | | while (!sortedCursors.isEmpty() |
| | | && lateQueue.count() < 100 |
| | | && lateQueue.bytesCount() < 50000) |
| | | cursor = replicationServerDomain.getCursorFrom(serverState); |
| | | while (cursor.next() && isLateQueueBelowThreshold()) |
| | | { |
| | | lateQueue.add(nextOldestUpdateMsg(sortedCursors)); |
| | | lateQueue.add(cursor.getChange()); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | close(sortedCursors); |
| | | close(cursor); |
| | | } |
| | | |
| | | /* |
| | |
| | | return null; |
| | | } |
| | | |
| | | private UpdateMsg nextOldestUpdateMsg( |
| | | NavigableSet<ReplicaDBCursor> sortedCursors) |
| | | private boolean isLateQueueBelowThreshold() |
| | | { |
| | | /* |
| | | * The cursors are sorted based on the currentChange of each cursor to |
| | | * consider the next change across all servers. |
| | | * To keep consistent the order of the cursors in the SortedSet, |
| | | * it is necessary to remove and eventually add again a cursor (after moving |
| | | * it forward). |
| | | */ |
| | | final ReplicaDBCursor cursor = sortedCursors.pollFirst(); |
| | | final UpdateMsg result = cursor.getChange(); |
| | | cursor.next(); |
| | | addCursorIfNotEmpty(sortedCursors, cursor); |
| | | return result; |
| | | } |
| | | |
| | | private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors, |
| | | ReplicaDBCursor cursor) |
| | | { |
| | | if (cursor.getChange() != null) |
| | | { |
| | | cursors.add(cursor); |
| | | } |
| | | else |
| | | { |
| | | close(cursor); |
| | | } |
| | | return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | private CSN findOldestCSNFromReplicaDBs() |
| | | { |
| | | SortedSet<ReplicaDBCursor> sortedCursors = null; |
| | | ReplicaDBCursor cursor = null; |
| | | try |
| | | { |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | | UpdateMsg msg = sortedCursors.first().getChange(); |
| | | return msg.getCSN(); |
| | | cursor = replicationServerDomain.getCursorFrom(serverState); |
| | | cursor.next(); |
| | | return cursor.getChange().getCSN(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | } |
| | | finally |
| | | { |
| | | close(sortedCursors); |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Collects all the {@link ReplicaDBCursor}s that have changes and sort them |
| | | * with the oldest {@link CSN} first. |
| | | * |
| | | * @return a List of cursors with changes sorted by their {@link CSN} |
| | | * (oldest first) |
| | | */ |
| | | private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges() |
| | | { |
| | | final NavigableSet<ReplicaDBCursor> results = |
| | | new TreeSet<ReplicaDBCursor>(); |
| | | for (int serverId : replicationServerDomain.getServerIds()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCsn = serverState.getCSN(serverId); |
| | | addCursorIfNotEmpty(results, |
| | | replicationServerDomain.getCursorFrom(serverId, lastCsn)); |
| | | } |
| | | return results; |
| | | } |
| | | |
| | | /** |
| | | * Get the count of updates sent to this server. |
| | | * @return The count of update sent to this server. |
| | | */ |