| | |
| | | * |
| | | * |
| | | * Copyright 2009-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2012 ForgeRock AS |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | while ((msgQueue.count() > maxQueueSize) || |
| | | (msgQueue.bytesCount() > maxQueueBytesSize)) |
| | | { |
| | | setFollowing(false); |
| | | following = false; |
| | | msgQueue.removeFirst(); |
| | | } |
| | | } |
| | |
| | | protected UpdateMsg getNextMessage(boolean synchronous) |
| | | { |
| | | UpdateMsg msg; |
| | | while (activeConsumer == true) |
| | | while (activeConsumer) |
| | | { |
| | | if (following == false) |
| | | if (!following) |
| | | { |
| | | /* this server is late with regard to some other masters |
| | | * in the topology or just joined the topology. |
| | |
| | | if ((msgQueue.count() < maxQueueSize) && |
| | | (msgQueue.bytesCount() < maxQueueBytesSize)) |
| | | { |
| | | setFollowing(true); |
| | | following = true; |
| | | } |
| | | } |
| | | } else |
| | |
| | | if (msgQueue.contains(msg)) |
| | | { |
| | | /* we finally catch up with the regular queue */ |
| | | setFollowing(true); |
| | | following = true; |
| | | lateQueue.clear(); |
| | | UpdateMsg msg1; |
| | | do |
| | |
| | | } |
| | | synchronized (msgQueue) |
| | | { |
| | | if (following == true) |
| | | if (following) |
| | | { |
| | | try |
| | | { |
| | | while (msgQueue.isEmpty() && (following == true)) |
| | | while (msgQueue.isEmpty() && following) |
| | | { |
| | | if (!synchronous) |
| | | return null; |
| | |
| | | ChangeNumber result = null; |
| | | synchronized (msgQueue) |
| | | { |
| | | if (isFollowing()) |
| | | if (following) |
| | | { |
| | | if (msgQueue.isEmpty()) |
| | | { |
| | |
| | | { |
| | | if (lateQueue.isEmpty()) |
| | | { |
| | | // isFollowing is false AND lateQueue is empty |
| | | // We may be at the very moment when the writer has emptyed the |
| | | // lateQueue when it sent the last update. The writer will fill again |
| | | // the lateQueue when it will send the next update but we are not yet |
| | | // there. So let's take the last change not sent directly from |
| | | // the db. |
| | | |
| | | /* |
| | | following is false AND lateQueue is empty |
| | | We may be at the very moment when the writer has emptied the |
| | | lateQueue when it sent the last update. The writer will fill again |
| | | the lateQueue when it will send the next update but we are not yet |
| | | there. So let's take the last change not sent directly from |
| | | the db. |
| | | */ |
| | | ReplicationIteratorComparator comparator = |
| | | new ReplicationIteratorComparator(); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | |
| | | // get an iterator in this server db from that last change |
| | | ReplicationIterator iterator = |
| | | replicationServerDomain.getChangelogIterator(serverId, lastCsn); |
| | | // if that iterator has changes, then it is a candidate |
| | | // it is added in the sorted list at a position given by its |
| | | // current change (see ReplicationIteratorComparator). |
| | | /* |
| | | if that iterator has changes, then it is a candidate |
| | | it is added in the sorted list at a position given by its |
| | | current change (see ReplicationIteratorComparator). |
| | | */ |
| | | if (iterator != null) |
| | | { |
| | | if (iterator.getChange() != null) |
| | |
| | | * When the server is up to date or close to be up to date, |
| | | * the number of updates to be sent is the size of the receive queue. |
| | | */ |
| | | if (isFollowing()) |
| | | if (following) |
| | | return msgQueue.count(); |
| | | else |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if the LDAP server can follow the speed of the other servers. |
| | | * @return true when the server has all the not yet sent changes |
| | | * in its queue. |
| | | */ |
| | | public boolean isFollowing() |
| | | { |
| | | return following; |
| | | } |
| | | |
| | | /** |
| | | * Set that the consumer is now becoming inactive and thus getNextMessage |
| | | * should not return any UpdateMsg any more. |
| | | * @param active the provided state of the consumer. |
| | |
| | | this.activeConsumer = active; |
| | | } |
| | | |
| | | /** |
| | | * Set the following flag of this server. |
| | | * @param following the value that should be set. |
| | | */ |
| | | private void setFollowing(boolean following) |
| | | { |
| | | this.following = following; |
| | | } |
| | | |
| | | /** |
| | | * Set the initial value of the serverState for this handler. |