| | |
| | | new ReplicationIteratorComparator(); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>(comparator); |
| | | /* fill the lateQueue */ |
| | | for (int serverId : replicationServerDomain.getServers()) |
| | | try |
| | | { |
| | | ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); |
| | | ReplicationIterator iterator = |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | if (iterator != null) |
| | | /* fill the lateQueue */ |
| | | for (int serverId : replicationServerDomain.getServers()) |
| | | { |
| | | if (iterator.getChange() != null) |
| | | ChangeNumber lastCsn = serverState |
| | | .getMaxChangeNumber(serverId); |
| | | ReplicationIterator iterator = replicationServerDomain |
| | | .getChangelogIterator(serverId, lastCsn); |
| | | if (iterator != null) |
| | | { |
| | | if (iterator.getChange() != null) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | else |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // The loop below relies on the fact that it is sorted based |
| | | // on the currentChange of each iterator to consider the next |
| | | // change across all servers. |
| | | // |
| | | // Hence it is necessary to remove and eventual add again an |
| | | // iterator when looping in order to keep consistent the order of |
| | | // the iterators (see ReplicationIteratorComparator. |
| | | while (!iteratorSortedSet.isEmpty() |
| | | && (lateQueue.count() < 100) |
| | | && (lateQueue.bytesCount() < 50000)) |
| | | { |
| | | ReplicationIterator iterator = iteratorSortedSet |
| | | .first(); |
| | | iteratorSortedSet.remove(iterator); |
| | | lateQueue.add(iterator.getChange()); |
| | | if (iterator.next()) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // The loop below relies on the fact that it is sorted based |
| | | // on the currentChange of each iterator to consider the next |
| | | // change across all servers. |
| | | // Hence it is necessary to remove and eventual add again an iterator |
| | | // when looping in order to keep consistent the order of the |
| | | // iterators (see ReplicationIteratorComparator. |
| | | while (!iteratorSortedSet.isEmpty() && |
| | | (lateQueue.count()<100) && |
| | | (lateQueue.bytesCount()<50000) ) |
| | | finally |
| | | { |
| | | ReplicationIterator iterator = iteratorSortedSet.first(); |
| | | iteratorSortedSet.remove(iterator); |
| | | lateQueue.add(iterator.getChange()); |
| | | if (iterator.next()) |
| | | iteratorSortedSet.add(iterator); |
| | | else |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | |
| | | /* |
| | | * If the late queue is empty then we could not find any |
| | | * messages in the replication log so the remote serevr is not |
| | |
| | | // if that iterator has changes, then it is a candidate |
| | | // it is added in the sorted list at a position given by its |
| | | // current change (see ReplicationIteratorComparator). |
| | | if ((iterator != null) && (iterator.getChange() != null)) |
| | | if (iterator != null) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | if (iterator.getChange() != null) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | else |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | UpdateMsg msg = iteratorSortedSet.first().getChange(); |