| | |
| | | } |
| | | |
| | | /** |
| | | * Get the next update that must be sent to the consumer from the message queue or from the |
| | | * database. |
| | | * Get the next update that must be sent to the consumer from the message queue or from the database. |
| | | * |
| | | * @param sendToServerId |
| | | * serverId of the replica where changes will be sent |
| | | * @return The next update that must be sent to the consumer, or {@code null} when queue is empty |
| | | * @throws ChangelogException |
| | | * If a problem occurs when reading the changelog |
| | | */ |
| | | protected UpdateMsg getNextMessage(int sendToServerId) throws ChangelogException |
| | | protected UpdateMsg getNextMessage() throws ChangelogException |
| | | { |
| | | while (activeConsumer) |
| | | { |
| | |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | */ |
| | | fillLateQueue(sendToServerId); |
| | | fillLateQueue(); |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | // we could not find any messages in the changelog |
| | |
| | | * Fills the late queue with the most recent changes, accepting only the |
| | | * messages from provided replica ids. |
| | | */ |
| | | private void fillLateQueue(int sendToServerId) throws ChangelogException |
| | | private void fillLateQueue() throws ChangelogException |
| | | { |
| | | try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);) |
| | | { |
| | | while (cursor.next() && isLateQueueBelowThreshold()) |
| | | { |
| | | final UpdateMsg record = cursor.getRecord(); |
| | | if (record.getCSN().getServerId() != sendToServerId) |
| | | { |
| | | lateQueue.add(record); |
| | | } |
| | | lateQueue.add(cursor.getRecord()); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | private CSN findOldestCSNFromReplicaDBs() |
| | | { |
| | | try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);) |
| | | try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState)) |
| | | { |
| | | while (cursor.next()) |
| | | { |