| | |
| | | { |
| | | while (activeConsumer) |
| | | { |
| | | UpdateMsg msg; |
| | | if (!following) |
| | | { |
| | | /* this server is late with regard to some other masters |
| | |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | */ |
| | | NavigableSet<ReplicationDBCursor> sortedCursors = null; |
| | | NavigableSet<ReplicationDBCursor> sortedCursors = null; |
| | | try |
| | | { |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | | |
| | | // fill the lateQueue |
| | | // fill the lateQueue |
| | | while (!sortedCursors.isEmpty() |
| | | && lateQueue.count() < 100 |
| | | && lateQueue.bytesCount() < 50000) |
| | | { |
| | | lateQueue.add(nextOldestUpdateMsg(sortedCursors)); |
| | | lateQueue.add(nextOldestUpdateMsg(sortedCursors)); |
| | | } |
| | | } |
| | | finally |
| | |
| | | following = true; |
| | | } |
| | | } |
| | | } |
| | | else |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * if the first change in the lateQueue is also on the regular |
| | | * queue, we can resume the processing from the regular queue |
| | | * -> set following to true and empty the lateQueue. |
| | | */ |
| | | msg = lateQueue.first(); |
| | | UpdateMsg msg = lateQueue.first(); |
| | | synchronized (msgQueue) |
| | | { |
| | | if (msgQueue.contains(msg)) |
| | |
| | | } |
| | | } |
| | | } |
| | | } |
| | | else |
| | | } |
| | | else |
| | | { |
| | | // get the next change from the lateQueue |
| | | // get the next change from the lateQueue |
| | | UpdateMsg msg; |
| | | synchronized (msgQueue) |
| | | { |
| | | msg = lateQueue.removeFirst(); |
| | |
| | | return msg; |
| | | } |
| | | } |
| | | |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | if (following) |
| | |
| | | { |
| | | return null; |
| | | } |
| | | msg = msgQueue.removeFirst(); |
| | | |
| | | UpdateMsg msg = msgQueue.removeFirst(); |
| | | if (updateServerState(msg)) |
| | | { |
| | | /* |
| | |
| | | return null; |
| | | } |
| | | |
| | | private UpdateMsg nextOldestUpdateMsg( |
| | | NavigableSet<ReplicationDBCursor> sortedCursors) |
| | | { |
| | | /* |
| | | * The cursors are sorted based on the currentChange of each cursor to |
| | | * consider the next change across all servers. |
| | | * To keep consistent the order of the cursors in the SortedSet, |
| | | * it is necessary to remove and eventually add again a cursor (after moving |
| | | * it forward). |
| | | */ |
| | | final ReplicationDBCursor cursor = sortedCursors.pollFirst(); |
| | | final UpdateMsg result = cursor.getChange(); |
| | | cursor.next(); |
| | | addCursorIfNotEmpty(sortedCursors, cursor); |
| | | return result; |
| | | } |
| | | private UpdateMsg nextOldestUpdateMsg( |
| | | NavigableSet<ReplicationDBCursor> sortedCursors) |
| | | { |
| | | /* |
| | | * The cursors are sorted based on the currentChange of each cursor to |
| | | * consider the next change across all servers. |
| | | * To keep consistent the order of the cursors in the SortedSet, |
| | | * it is necessary to remove and eventually add again a cursor (after moving |
| | | * it forward). |
| | | */ |
| | | final ReplicationDBCursor cursor = sortedCursors.pollFirst(); |
| | | final UpdateMsg result = cursor.getChange(); |
| | | cursor.next(); |
| | | addCursorIfNotEmpty(sortedCursors, cursor); |
| | | return result; |
| | | } |
| | | |
| | | private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors, |
| | | ReplicationDBCursor cursor) |
| | | { |
| | | if (cursor != null) |
| | | { |
| | | if (cursor.getChange() != null) |
| | | { |
| | | cursors.add(cursor); |
| | | } |
| | | else |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | } |
| | | private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors, |
| | | ReplicationDBCursor cursor) |
| | | { |
| | | if (cursor != null) |
| | | { |
| | | if (cursor.getChange() != null) |
| | | { |
| | | cursors.add(cursor); |
| | | } |
| | | else |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the older Change Number for that server. |
| | |
| | | the lateQueue when it will send the next update but we are not yet |
| | | there. So let's take the last change not sent directly from the db. |
| | | */ |
| | | result = findOldestChangeNumberFromReplicationDBs(); |
| | | result = findOldestChangeNumberFromReplicationDBs(); |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | private ChangeNumber findOldestChangeNumberFromReplicationDBs() |
| | | { |
| | | SortedSet<ReplicationDBCursor> sortedCursors = null; |
| | | try |
| | | { |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | | UpdateMsg msg = sortedCursors.first().getChange(); |
| | | return msg.getChangeNumber(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | close(sortedCursors); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Collects all the replication DB cursors that have changes and sort them |
| | | * with the oldest {@link ChangeNumber} first. |
| | | * |
| | | * @return a List of cursors with changes sorted by their {@link ChangeNumber} |
| | | * (oldest first) |
| | | */ |
| | | private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges() |
| | | private ChangeNumber findOldestChangeNumberFromReplicationDBs() |
| | | { |
| | | final NavigableSet<ReplicationDBCursor> results = |
| | | new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator()); |
| | | SortedSet<ReplicationDBCursor> sortedCursors = null; |
| | | try |
| | | { |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | | UpdateMsg msg = sortedCursors.first().getChange(); |
| | | return msg.getChangeNumber(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | close(sortedCursors); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Collects all the replication DB cursors that have changes and sort them |
| | | * with the oldest {@link ChangeNumber} first. |
| | | * |
| | | * @return a List of cursors with changes sorted by their {@link ChangeNumber} |
| | | * (oldest first) |
| | | */ |
| | | private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges() |
| | | { |
| | | final NavigableSet<ReplicationDBCursor> results = |
| | | new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator()); |
| | | for (int serverId : replicationServerDomain.getServerIds()) |
| | | { |
| | | // get the last already sent CN from that server to get a cursor |
| | | final ChangeNumber lastCsn = serverState.getChangeNumber(serverId); |
| | | addCursorIfNotEmpty(results, |
| | | replicationServerDomain.getCursorFrom(serverId, lastCsn)); |
| | | final ChangeNumber lastCsn = serverState.getChangeNumber(serverId); |
| | | addCursorIfNotEmpty(results, |
| | | replicationServerDomain.getCursorFrom(serverId, lastCsn)); |
| | | } |
| | | return results; |
| | | } |