opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Message; @@ -255,12 +256,16 @@ * Get the next update that must be sent to the consumer * from the message queue or from the database. * * @param connectedReplicaIds * Ids of replicas to accept when returning a message from * the late queue. * @param synchronous specifies what to do when the queue is empty. * when true, the method blocks; when false the method return null. * * @return The next update that must be sent to the consumer. * null when synchronous is false and queue is empty. */ protected UpdateMsg getNextMessage(boolean synchronous) protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous) { while (activeConsumer) { @@ -293,7 +298,7 @@ * restart as usual * load this change on the delayList */ fillLateQueue(); fillLateQueue(connectedReplicaIds); if (lateQueue.isEmpty()) { // we could not find any messages in the changelog @@ -387,7 +392,11 @@ return null; } private void fillLateQueue() /** * Fills the late queue with the most recent changes, accepting only the * messages from provided replica ids. */ private void fillLateQueue(Set<Integer> connectedReplicaIds) { DBCursor<UpdateMsg> cursor = null; try @@ -395,7 +404,11 @@ cursor = replicationServerDomain.getCursorFrom(serverState); while (cursor.next() && isLateQueueBelowThreshold()) { lateQueue.add(cursor.getRecord()); final UpdateMsg record = cursor.getRecord(); if (connectedReplicaIds.contains(record.getCSN().getServerId())) { lateQueue.add(record); } } } catch (ChangelogException e) opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -31,9 +31,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; @@ -1291,15 +1293,14 @@ */ public UpdateMsg take(ServerHandler sHandler) { /* * Get the balanced tree that we use to sort the changes to be * sent to the replica from the cookie * * The next change to send is always the first one in the tree * So this methods simply need to check that dependencies are OK * and update this replicaId RUV */ return sHandler.take(); // Next message can only be taken from connected DSs final Set<Integer> connectedReplicaIds = new HashSet<Integer>(getConnectedDSs().keySet()); if (sHandler.isDataServer()) { // Prevents sending to a DS its own messages connectedReplicaIds.remove(sHandler.getServerId()); } return sHandler.take(connectedReplicaIds); } /** opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -29,6 +29,7 @@ import java.io.IOException; import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -960,13 +961,15 @@ * Select the next update that must be sent to the server managed by this * ServerHandler. * * @param connectedReplicaIds * Ids of replicas to accept when returning a message. * @return the next update that must be sent to the server managed by this * ServerHandler. */ public UpdateMsg take() public UpdateMsg take(Set<Integer> connectedReplicaIds) { boolean interrupted = true; UpdateMsg msg = getNextMessage(true); // synchronous:block until msg UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg boolean acquired = false; do