| | |
| | | * unlock memory tree |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | * |
| | | */ |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>( |
| | | new ReplicationIteratorComparator()); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = null; |
| | | try |
| | | { |
| | | /* fill the lateQueue */ |
| | | for (int serverId : replicationServerDomain.getServers()) |
| | | { |
| | | ChangeNumber lastCsn = serverState.getChangeNumber(serverId); |
| | | ReplicationIterator iterator = replicationServerDomain |
| | | .getChangelogIterator(serverId, lastCsn); |
| | | if (iterator != null) |
| | | { |
| | | if (iterator.getChange() != null) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | else |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | iteratorSortedSet = collectAllIteratorsWithChanges(); |
| | | |
| | | /* fill the lateQueue */ |
| | | // The loop below relies on the fact that it is sorted based |
| | | // on the currentChange of each iterator to consider the next |
| | | // change across all servers. |
| | |
| | | ReplicationIterator iterator = iteratorSortedSet.first(); |
| | | iteratorSortedSet.remove(iterator); |
| | | lateQueue.add(iterator.getChange()); |
| | | if (iterator.next()) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | else |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | addIteratorIfNotEmpty(iteratorSortedSet, iterator); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | releaseAllIterators(iteratorSortedSet); |
| | | } |
| | | |
| | | /* |
| | |
| | | * messages in the replication log so the remote serevr is not |
| | | * late anymore. |
| | | */ |
| | | |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | synchronized (msgQueue) |
| | |
| | | return null; |
| | | } |
| | | |
| | | private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators, |
| | | ReplicationIterator iter) |
| | | { |
| | | if (iter.next()) |
| | | { |
| | | iterators.add(iter); |
| | | } |
| | | else |
| | | { |
| | | iter.releaseCursor(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the older Change Number for that server. |
| | | * Returns null when the queue is empty. |
| | |
| | | } |
| | | else |
| | | { |
| | | if (lateQueue.isEmpty()) |
| | | if (!lateQueue.isEmpty()) |
| | | { |
| | | UpdateMsg msg = lateQueue.first(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | following is false AND lateQueue is empty |
| | |
| | | there. So let's take the last change not sent directly from |
| | | the db. |
| | | */ |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>( |
| | | new ReplicationIteratorComparator()); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = null; |
| | | try |
| | | { |
| | | // Build a list of candidates iterator (i.e. db i.e. server) |
| | | for (int serverId : replicationServerDomain.getServers()) |
| | | { |
| | | // get the last already sent CN from that server |
| | | ChangeNumber lastCsn = serverState.getChangeNumber(serverId); |
| | | // get an iterator in this server db from that last change |
| | | ReplicationIterator iterator = |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | /* |
| | | if that iterator has changes, then it is a candidate |
| | | it is added in the sorted list at a position given by its |
| | | current change (see ReplicationIteratorComparator). |
| | | */ |
| | | if (iterator != null) |
| | | { |
| | | if (iterator.getChange() != null) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | | } |
| | | else |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | iteratorSortedSet = collectAllIteratorsWithChanges(); |
| | | UpdateMsg msg = iteratorSortedSet.first().getChange(); |
| | | result = msg.getChangeNumber(); |
| | | } catch (Exception e) |
| | |
| | | result = null; |
| | | } finally |
| | | { |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | | releaseAllIterators(iteratorSortedSet); |
| | | } |
| | | } else |
| | | { |
| | | UpdateMsg msg = lateQueue.first(); |
| | | result = msg.getChangeNumber(); |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges() |
| | | { |
| | | SortedSet<ReplicationIterator> results = |
| | | new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator()); |
| | | |
| | | // Build a list of candidates iterator (i.e. db i.e. server) |
| | | for (int serverId : replicationServerDomain.getServers()) |
| | | { |
| | | // get the last already sent CN from that server |
| | | ChangeNumber lastCsn = serverState.getChangeNumber(serverId); |
| | | // get an iterator in this server db from that last change |
| | | ReplicationIterator iter = |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | /* |
| | | if that iterator has changes, then it is a candidate |
| | | it is added in the sorted list at a position given by its |
| | | current change (see ReplicationIteratorComparator). |
| | | */ |
| | | if (iter != null) |
| | | { |
| | | if (iter.getChange() != null) |
| | | { |
| | | results.add(iter); |
| | | } |
| | | else |
| | | { |
| | | iter.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | return results; |
| | | } |
| | | |
| | | private void releaseAllIterators(SortedSet<ReplicationIterator> iterators) |
| | | { |
| | | if (iterators != null) |
| | | { |
| | | for (ReplicationIterator iter : iterators) |
| | | { |
| | | iter.releaseCursor(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the count of updates sent to this server. |
| | | * @return The count of update sent to this server. |