opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -56,6 +56,7 @@ private BlockingQueue<UpdateToReplay> updateToReplayQueue = null; private boolean shutdown = false; private boolean done = false; private static int count = 0; /** * Constructor for the ReplayThread. @@ -64,7 +65,7 @@ */ public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue) { super("Replication Replay thread"); super("Replication Replay thread " + count++); this.updateToReplayQueue = updateToReplayQueue; } @@ -130,7 +131,7 @@ { try { while (done == false) while ((done == false) && (this.isAlive())) { Thread.sleep(50); } opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -954,17 +954,9 @@ { WindowMessage windowMsg = (WindowMessage) msg; sendWindow.release(windowMsg.getNumAck()); } else { if (msg instanceof UpdateMessage) { rcvWindow--; if (rcvWindow < halfRcvWindow) { session.publish(new WindowMessage(halfRcvWindow)); rcvWindow += halfRcvWindow; } } else { return msg; } } catch (SocketTimeoutException e) @@ -988,6 +980,30 @@ } /** * This method allows to do the necessary computing for the window * management after treatment by the worker threads. * * This should be called once the replay thread have done their job * and the window can be open again. */ public synchronized void updateWindowAfterReplay() { try { rcvWindow--; if (rcvWindow < halfRcvWindow) { session.publish(new WindowMessage(halfRcvWindow)); rcvWindow += halfRcvWindow; } } catch (IOException e) { // Any error on the socket will be handled by the thread calling receive() // just ignore. } } /** * stop the server. */ public void stop() opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,8 +869,6 @@ while (update == null) { InitializeRequestMessage initMsg = null; synchronized (broker) { ReplicationMessage msg; try { @@ -954,7 +952,6 @@ { // just retry } } // Test if we have received and export request message and // if that's the case handle it now. // This must be done outside of the portion of code protected @@ -1259,7 +1256,10 @@ shutdown = true; // Stop the listener thread if (listenerThread != null) { listenerThread.shutdown(); } synchronized (this) { @@ -1274,6 +1274,7 @@ broker.stop(); // Wait for the listener thread to stop if (listenerThread != null) listenerThread.waitForShutdown(); // wait for completion of the persistentServerState thread. @@ -1441,6 +1442,7 @@ { if (!dependency) { broker.updateWindowAfterReplay(); if (msg.isAssured()) ack(msg.getChangeNumber()); incProcessedUpdates(); opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -155,6 +155,7 @@ while (true) { broker.receive(); broker.updateWindowAfterReplay(); rcvCount++; } } opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -248,6 +248,7 @@ "uid"); server1.publish(msg); ReplicationMessage msg2 = server2.receive(); server2.updateWindowAfterReplay(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; @@ -263,6 +264,7 @@ msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid"); server1.publish(msg); msg2 = server2.receive(); server2.updateWindowAfterReplay(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; @@ -280,6 +282,7 @@ "other-uid"); server2.publish(msg); msg2 = server1.receive(); server1.updateWindowAfterReplay(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; @@ -295,6 +298,7 @@ msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid"); server2.publish(msg); msg2 = server1.receive(); server1.updateWindowAfterReplay(); if (msg2 instanceof DeleteMsg) { DeleteMsg del = (DeleteMsg) msg2; @@ -329,6 +333,7 @@ 100, replicationServerPort, 1000, false); ReplicationMessage msg2 = broker.receive(); broker.updateWindowAfterReplay(); if (!(msg2 instanceof DeleteMsg)) fail("ReplicationServer basic transmission failed:" + msg2); else @@ -367,6 +372,7 @@ 100, replicationServerPort, 5000, state); ReplicationMessage msg2 = broker.receive(); broker.updateWindowAfterReplay(); if (!(msg2 instanceof DeleteMsg)) { fail("ReplicationServer basic transmission failed:" + msg2); @@ -776,6 +782,7 @@ msg2 = broker2.receive(); if (msg2 == null) break; broker2.updateWindowAfterReplay(); } catch (Exception e) { @@ -982,6 +989,7 @@ while (true) { ReplicationMessage msg = broker.receive(); broker.updateWindowAfterReplay(); if (msg == null) break; }