| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.*; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.messages.Message; |
| | |
| | | */ |
| | | protected UpdateMsg getNextMessage(boolean synchronous) |
| | | { |
| | | UpdateMsg msg; |
| | | while (activeConsumer) |
| | | { |
| | | UpdateMsg msg; |
| | | if (!following) |
| | | { |
| | | /* this server is late with regard to some other masters |
| | |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | */ |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = null; |
| | | NavigableSet<ReplicationDBCursor> sortedCursors = null; |
| | | try |
| | | { |
| | | iteratorSortedSet = collectAllIteratorsWithChanges(); |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | | |
| | | /* fill the lateQueue */ |
| | | // The loop below relies on the fact that it is sorted based |
| | | // on the currentChange of each iterator to consider the next |
| | | // change across all servers. |
| | | // |
| | | // Hence it is necessary to remove and eventual add again an |
| | | // iterator when looping in order to keep consistent the order of |
| | | // the iterators (see ReplicationIteratorComparator. |
| | | while (!iteratorSortedSet.isEmpty() |
| | | && (lateQueue.count() < 100) |
| | | && (lateQueue.bytesCount() < 50000)) |
| | | // fill the lateQueue |
| | | while (!sortedCursors.isEmpty() |
| | | && lateQueue.count() < 100 |
| | | && lateQueue.bytesCount() < 50000) |
| | | { |
| | | ReplicationIterator iterator = iteratorSortedSet.first(); |
| | | iteratorSortedSet.remove(iterator); |
| | | lateQueue.add(iterator.getChange()); |
| | | addIteratorIfNotEmpty(iteratorSortedSet, iterator); |
| | | lateQueue.add(nextOldestUpdateMsg(sortedCursors)); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | close(iteratorSortedSet); |
| | | close(sortedCursors); |
| | | } |
| | | |
| | | /* |
| | |
| | | following = true; |
| | | } |
| | | } |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * if the first change in the lateQueue is also on the regular |
| | |
| | | } |
| | | } |
| | | } |
| | | } else |
| | | } |
| | | else |
| | | { |
| | | /* get the next change from the lateQueue */ |
| | | // get the next change from the lateQueue |
| | | synchronized (msgQueue) |
| | | { |
| | | msg = lateQueue.removeFirst(); |
| | |
| | | return null; |
| | | } |
| | | |
| | | private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators, |
| | | ReplicationIterator iter) |
| | | { |
| | | if (iter.next()) |
| | | { |
| | | iterators.add(iter); |
| | | } |
| | | else |
| | | { |
| | | close(iter); |
| | | } |
| | | } |
| | | private UpdateMsg nextOldestUpdateMsg( |
| | | NavigableSet<ReplicationDBCursor> sortedCursors) |
| | | { |
| | | /* |
| | | * The cursors are sorted based on the currentChange of each cursor to |
| | | * consider the next change across all servers. |
| | | * To keep consistent the order of the cursors in the SortedSet, |
| | | * it is necessary to remove and eventually add again a cursor (after moving |
| | | * it forward). |
| | | */ |
| | | final ReplicationDBCursor cursor = sortedCursors.pollFirst(); |
| | | final UpdateMsg result = cursor.getChange(); |
| | | cursor.next(); |
| | | addCursorIfNotEmpty(sortedCursors, cursor); |
| | | return result; |
| | | } |
| | | |
| | | private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors, |
| | | ReplicationDBCursor cursor) |
| | | { |
| | | if (cursor != null) |
| | | { |
| | | if (cursor.getChange() != null) |
| | | { |
| | | cursors.add(cursor); |
| | | } |
| | | else |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the older Change Number for that server. |
| | |
| | | We may be at the very moment when the writer has emptied the |
| | | lateQueue when it sent the last update. The writer will fill again |
| | | the lateQueue when it will send the next update but we are not yet |
| | | there. So let's take the last change not sent directly from |
| | | the db. |
| | | there. So let's take the last change not sent directly from the db. |
| | | */ |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = null; |
| | | try |
| | | { |
| | | iteratorSortedSet = collectAllIteratorsWithChanges(); |
| | | UpdateMsg msg = iteratorSortedSet.first().getChange(); |
| | | result = msg.getChangeNumber(); |
| | | } catch (Exception e) |
| | | { |
| | | result = null; |
| | | } finally |
| | | { |
| | | close(iteratorSortedSet); |
| | | } |
| | | result = findOldestChangeNumberFromReplicationDBs(); |
| | | } |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges() |
| | | { |
| | | SortedSet<ReplicationIterator> results = |
| | | new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator()); |
| | | private ChangeNumber findOldestChangeNumberFromReplicationDBs() |
| | | { |
| | | SortedSet<ReplicationDBCursor> sortedCursors = null; |
| | | try |
| | | { |
| | | sortedCursors = collectAllCursorsWithChanges(); |
| | | UpdateMsg msg = sortedCursors.first().getChange(); |
| | | return msg.getChangeNumber(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | | close(sortedCursors); |
| | | } |
| | | } |
| | | |
| | | // Build a list of candidates iterator (i.e. db i.e. server) |
| | | /** |
| | | * Collects all the replication DB cursors that have changes and sort them |
| | | * with the oldest {@link ChangeNumber} first. |
| | | * |
| | | * @return a List of cursors with changes sorted by their {@link ChangeNumber} |
| | | * (oldest first) |
| | | */ |
| | | private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges() |
| | | { |
| | | final NavigableSet<ReplicationDBCursor> results = |
| | | new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator()); |
| | | for (int serverId : replicationServerDomain.getServerIds()) |
| | | { |
| | | // get the last already sent CN from that server |
| | | ChangeNumber lastCsn = serverState.getChangeNumber(serverId); |
| | | // get an iterator in this server db from that last change |
| | | ReplicationIterator iter = |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | /* |
| | | if that iterator has changes, then it is a candidate |
| | | it is added in the sorted list at a position given by its |
| | | current change (see ReplicationIteratorComparator). |
| | | */ |
| | | if (iter != null) |
| | | { |
| | | if (iter.getChange() != null) |
| | | { |
| | | results.add(iter); |
| | | } |
| | | else |
| | | { |
| | | close(iter); |
| | | } |
| | | } |
| | | // get the last already sent CN from that server to get a cursor |
| | | final ChangeNumber lastCsn = serverState.getChangeNumber(serverId); |
| | | addCursorIfNotEmpty(results, |
| | | replicationServerDomain.getCursorFrom(serverId, lastCsn)); |
| | | } |
| | | return results; |
| | | } |