| | |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | */ |
| | | NavigableSet<ReplicationDBCursor> sortedCursors = null; |
| | | NavigableSet<ReplicaDBCursor> sortedCursors = null; |
| | | try |
| | | { |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | |
| | | } |
| | | |
| | | private UpdateMsg nextOldestUpdateMsg( |
| | | NavigableSet<ReplicationDBCursor> sortedCursors) |
| | | NavigableSet<ReplicaDBCursor> sortedCursors) |
| | | { |
| | | /* |
| | | * The cursors are sorted based on the currentChange of each cursor to |
| | |
| | | * it is necessary to remove and eventually add again a cursor (after moving |
| | | * it forward). |
| | | */ |
| | | final ReplicationDBCursor cursor = sortedCursors.pollFirst(); |
| | | final ReplicaDBCursor cursor = sortedCursors.pollFirst(); |
| | | final UpdateMsg result = cursor.getChange(); |
| | | cursor.next(); |
| | | addCursorIfNotEmpty(sortedCursors, cursor); |
| | | return result; |
| | | } |
| | | |
| | | private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors, |
| | | ReplicationDBCursor cursor) |
| | | private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors, |
| | | ReplicaDBCursor cursor) |
| | | { |
| | | if (cursor != null) |
| | | { |
| | |
| | | the lateQueue when it will send the next update but we are not yet |
| | | there. So let's take the last change not sent directly from the db. |
| | | */ |
| | | result = findOldestChangeNumberFromReplicationDBs(); |
| | | result = findOldestChangeNumberFromReplicaDBs(); |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | private ChangeNumber findOldestChangeNumberFromReplicationDBs() |
| | | private ChangeNumber findOldestChangeNumberFromReplicaDBs() |
| | | { |
| | | SortedSet<ReplicationDBCursor> sortedCursors = null; |
| | | SortedSet<ReplicaDBCursor> sortedCursors = null; |
| | | try |
| | | { |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Collects all the replication DB cursors that have changes and sort them |
| | | * Collects all the {@link ReplicaDBCursor}s that have changes and sort them |
| | | * with the oldest {@link ChangeNumber} first. |
| | | * |
| | | * @return a List of cursors with changes sorted by their {@link ChangeNumber} |
| | | * (oldest first) |
| | | */ |
| | | private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges() |
| | | private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges() |
| | | { |
| | | final NavigableSet<ReplicationDBCursor> results = |
| | | new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator()); |
| | | final NavigableSet<ReplicaDBCursor> results = |
| | | new TreeSet<ReplicaDBCursor>(new ReplicaDBCursorComparator()); |
| | | for (int serverId : replicationServerDomain.getServerIds()) |
| | | { |
| | | // get the last already sent CN from that server to get a cursor |