| | |
| | | * restart as usual |
| | | * load this change on the delayList |
| | | */ |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | | { |
| | | // fill the lateQueue |
| | | cursor = replicationServerDomain.getCursorFrom(serverState); |
| | | while (cursor.next() && isLateQueueBelowThreshold()) |
| | | { |
| | | lateQueue.add(cursor.getRecord()); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | |
| | | /* |
| | | * If the late queue is empty then we could not find any messages in |
| | | * the replication log so the remote server is not late anymore. |
| | | */ |
| | | fillLateQueue(); |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | // we could not find any messages in the changelog |
| | | // so the remote server is not late anymore. |
| | | synchronized (msgQueue) |
| | | { |
| | | // Ensure we are below threshold so this server will follow the |
| | |
| | | else |
| | | { |
| | | /* |
| | | * if the first change in the lateQueue is also on the regular |
| | | * queue, we can resume the processing from the regular queue |
| | | * if the first change in the lateQueue is also on the regular queue, |
| | | * we can resume the processing from the regular queue |
| | | * -> set following to true and empty the lateQueue. |
| | | */ |
| | | UpdateMsg msg = lateQueue.first(); |
| | |
| | | { |
| | | // get the next change from the lateQueue |
| | | UpdateMsg msg; |
| | | synchronized (msgQueue) |
| | | synchronized (msgQueue) // TODO JNR why synchronize(msgQueue) here? |
| | | { |
| | | msg = lateQueue.removeFirst(); |
| | | } |
| | |
| | | return null; |
| | | } |
| | | |
| | | private void fillLateQueue() |
| | | { |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | | { |
| | | cursor = replicationServerDomain.getCursorFrom(serverState); |
| | | while (cursor.next() && isLateQueueBelowThreshold()) |
| | | { |
| | | lateQueue.add(cursor.getRecord()); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | private boolean isLateQueueBelowThreshold() |
| | | { |
| | | return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000; |
| | |
| | | { |
| | | if (!msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg msg = msgQueue.first(); |
| | | result = msg.getCSN(); |
| | | result = msgQueue.first().getCSN(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (!lateQueue.isEmpty()) |
| | | { |
| | | UpdateMsg msg = lateQueue.first(); |
| | | result = msg.getCSN(); |
| | | result = lateQueue.first().getCSN(); |
| | | } |
| | | else |
| | | { |