opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ChangelogCache.java
@@ -42,7 +42,6 @@ import org.opends.server.synchronization.protocol.UpdateMessage; import java.io.IOException; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; @@ -192,15 +191,11 @@ /* * Push the message to the changelog servers */ HashSet<ServerHandler> saturatedHandler = new HashSet<ServerHandler>(); if (!sourceHandler.isChangelogServer()) { for (ServerHandler handler : changelogServers.values()) { handler.add(update); if (handler.isSaturated(update.getChangeNumber(), sourceHandler)) saturatedHandler.add(handler); handler.add(update, sourceHandler); } } @@ -215,45 +210,11 @@ continue; } handler.add(update); if (handler.isSaturated(update.getChangeNumber(), sourceHandler)) saturatedHandler.add(handler); handler.add(update, sourceHandler); } /* * Check if some flow control must be done. * Flow control is done by stopping the reader thread of this * server. The messages will therefore accumulate in the TCP socket * and will block the writer thread(s) on the other side. */ while (!saturatedHandler.isEmpty()) { HashSet<ServerHandler> restartedHandler = new HashSet<ServerHandler>(); for (ServerHandler handler : saturatedHandler) { if (handler.restartAfterSaturation(sourceHandler)) { restartedHandler.add(handler); } } for (ServerHandler handler : restartedHandler) saturatedHandler.remove(handler); synchronized(flowControlLock) { if (!saturatedHandler.isEmpty()) { try { flowControlLock.wait(100); } catch (Exception e) { // just loop } } } } } /** @@ -568,4 +529,44 @@ { return "ChangelogCache " + baseDn; } /** * Check if some server Handler should be removed from flow control state. * @throws IOException If an error happened. */ public void checkAllSaturation() throws IOException { for (ServerHandler handler : changelogServers.values()) { handler.checkWindow(); } for (ServerHandler handler : connectedServers.values()) { handler.checkWindow(); } } /** * Check if a server that was in flow control can now restart * sending updates. * @param sourceHandler The server that must be checked. * @return true if the server can restart sending changes. * false if the server can't restart sending changes. */ public boolean restartAfterSaturation(ServerHandler sourceHandler) { for (ServerHandler handler : changelogServers.values()) { if (!handler.restartAfterSaturation(sourceHandler)) return false; } for (ServerHandler handler : connectedServers.values()) { if (!handler.restartAfterSaturation(sourceHandler)) return false; } return true; } } opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -103,6 +103,10 @@ private ServerReader reader; private Semaphore sendWindow; private int sendWindowSize; private boolean flowControl = false; // indicate that the server is // flow controled and should // be stopped from sending messsages. private int saturationCount = 0; private static Map<ChangeNumber, ChangelogAckMessageList> changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>(); @@ -594,8 +598,9 @@ * managed by this ServerHandler. * * @param update The update that must be added to the list of updates. * @param sourceHandler The server that sent the update. */ public void add(UpdateMessage update) public void add(UpdateMessage update, ServerHandler sourceHandler) { synchronized (msgQueue) { @@ -617,6 +622,17 @@ msgQueue.removeFirst(); } } if (isSaturated(update.getChangeNumber(), sourceHandler)) { sourceHandler.setSaturated(true); } } private void setSaturated(boolean value) { flowControl = value; } /** @@ -630,6 +646,24 @@ { boolean interrupted = true; UpdateMessage msg = getnextMessage(); /* * When we remove a message from the queue we need to check if another * server is waiting in flow control because this queue was too long. * This check might cause a performance penalty an therefore it * is not done for every message removed but only every few messages. */ if (++saturationCount > 10) { saturationCount = 0; try { changelogCache.checkAllSaturation(); } catch (IOException e) { } } do { try { @@ -1143,19 +1177,40 @@ } /** * Decrement the protocol window, then check if it is necessary * to send a WindowMessage and send it. * * @throws IOException when the session becomes unavailable. */ public synchronized void decAndCheckWindow() throws IOException { rcvWindow--; checkWindow(); } /** * Check the protocol window and send WindowMessage if necessary. * * @throws IOException when the session becomes unavailable. */ public synchronized void checkWindow() throws IOException { rcvWindow--; if (rcvWindow < rcvWindowSizeHalf) { WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); session.publish(msg); outAckCount++; rcvWindow += rcvWindowSizeHalf; if (flowControl) { if (changelogCache.restartAfterSaturation(this)) { flowControl = false; } } if (!flowControl) { WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); session.publish(msg); outAckCount++; rcvWindow += rcvWindowSizeHalf; } } } opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerReader.java
@@ -102,12 +102,13 @@ if (msg instanceof AckMessage) { AckMessage ack = (AckMessage) msg; handler.checkWindow(); changelogCache.ack(ack, serverId); } else if (msg instanceof UpdateMessage) { UpdateMessage update = (UpdateMessage) msg; handler.checkWindow(); handler.decAndCheckWindow(); changelogCache.put(update, handler); } else if (msg instanceof WindowMessage) opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/ChangelogTest.java
@@ -464,7 +464,7 @@ * This test is sconfigured for a relatively low stress * but can be changed using TOTAL_MSG and THREADS consts. */ @Test(enabled=false, groups="slow") @Test(enabled=true, groups="slow") public void multipleWriterMultipleReader() throws Exception { ChangelogBroker server = null;