opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.forgerock.i18n.LocalizableMessage; @@ -234,19 +233,14 @@ } /** * Get the next update that must be sent to the consumer * from the message queue or from the database. * Get the next update that must be sent to the consumer from the message queue or from the * database. * * @param connectedReplicaIds * Ids of replicas to accept when returning a message from * the late queue. * @param synchronous specifies what to do when the queue is empty. * when true, the method blocks; when false the method return null. * * @return The next update that must be sent to the consumer. * null when synchronous is false and queue is empty. * @param sendToServerId * serverId of the replica where changes will be sent * @return The next update that must be sent to the consumer, or {@code null} when queue is empty */ protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous) protected UpdateMsg getNextMessage(int sendToServerId) { while (activeConsumer) { @@ -279,7 +273,7 @@ * restart as usual * load this change on the delayList */ fillLateQueue(connectedReplicaIds); fillLateQueue(sendToServerId); if (lateQueue.isEmpty()) { // we could not find any messages in the changelog @@ -342,10 +336,6 @@ { while (msgQueue.isEmpty() && following) { if (!synchronous) { return null; } msgQueue.wait(500); if (!activeConsumer) { @@ -381,14 +371,14 @@ * Fills the late queue with the most recent changes, accepting only the * messages from provided replica ids. */ private void fillLateQueue(Set<Integer> connectedReplicaIds) private void fillLateQueue(int sendToServerId) { try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);) { while (cursor.next() && isLateQueueBelowThreshold()) { final UpdateMsg record = cursor.getRecord(); if (connectedReplicaIds.contains(record.getCSN().getServerId())) if (record.getCSN().getServerId() != sendToServerId) { lateQueue.add(record); } opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -31,7 +31,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -1265,21 +1264,6 @@ } /** * Get the next update that need to be sent to a given LDAP server. * This call is blocking when no update is available or when dependencies * do not allow to send the next available change * * @param sHandler The server handler for the target directory server. * * @return the update that must be forwarded */ public UpdateMsg take(ServerHandler sHandler) { // Next message can only be taken from connected DSs return sHandler.take(new HashSet<Integer>(getConnectedDSs().keySet())); } /** * Creates and returns a cursor across this replication domain. * <p> * Client code must call {@link DBCursor#next()} to advance the cursor to the opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
@@ -31,7 +31,6 @@ import java.io.IOException; import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -603,15 +602,6 @@ /** * Increment the number of updates sent to the server in assured safe data * mode. */ private void incrementAssuredSdSentUpdates() { assuredSdSentUpdates++; } /** * Increment the number of updates sent to the server in assured safe data * mode that timed out. */ public void incrementAssuredSdSentUpdatesTimeout() @@ -639,15 +629,6 @@ /** * Increment the number of updates sent to the server in assured safe read * mode. */ private void incrementAssuredSrSentUpdates() { assuredSrSentUpdates++; } /** * Increment the number of updates sent to the server in assured safe read * mode that timed out. */ public void incrementAssuredSrSentUpdatesTimeout() @@ -942,17 +923,31 @@ * Select the next update that must be sent to the server managed by this * ServerHandler. * * @param connectedReplicaIds * Ids of replicas to accept when returning a message. * @return the next update that must be sent to the server managed by this * ServerHandler. */ public UpdateMsg take(Set<Integer> connectedReplicaIds) public UpdateMsg take() { boolean interrupted = true; UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg final UpdateMsg msg = getNextMessage(serverId); acquirePermitInSendWindow(); if (msg != null) { incrementOutCount(); if (msg.isAssured()) { incrementAssuredStats(msg); } return msg; } return null; } private void acquirePermitInSendWindow() { boolean acquired = false; boolean interrupted = true; do { try @@ -964,22 +959,18 @@ // loop until not interrupted } } while ((interrupted || !acquired) && !shutdownWriter); if (msg != null) { incrementOutCount(); } if (msg.isAssured()) { if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) { incrementAssuredSrSentUpdates(); } else if (!isDataServer()) { incrementAssuredSdSentUpdates(); } } private void incrementAssuredStats(final UpdateMsg msg) { if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) { assuredSrSentUpdates++; } return msg; else if (!isDataServer()) { assuredSdSentUpdates++; } } /** opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011-2014 ForgeRock AS * Portions Copyright 2011-2015 ForgeRock AS */ package org.opends.server.replication.server; @@ -101,7 +101,7 @@ while (!shutdown || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN())) { final UpdateMsg updateMsg = replicationServerDomain.take(this.handler); final UpdateMsg updateMsg = this.handler.take(); if (updateMsg == null) { // this connection is closing