From 2ec1e20dacc4606317fc9e38890117df638a1fff Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 11 Feb 2008 13:42:42 +0000
Subject: [PATCH]
---
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java | 1
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 36 ++++++--
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 8 ++
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 170 +++++++++++++++++++++---------------------
5 files changed, 124 insertions(+), 96 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index 3774933..3ff5c08 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -56,6 +56,7 @@
private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
private boolean shutdown = false;
private boolean done = false;
+ private static int count = 0;
/**
* Constructor for the ReplayThread.
@@ -64,7 +65,7 @@
*/
public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
{
- super("Replication Replay thread");
+ super("Replication Replay thread " + count++);
this.updateToReplayQueue = updateToReplayQueue;
}
@@ -130,7 +131,7 @@
{
try
{
- while (done == false)
+ while ((done == false) && (this.isAlive()))
{
Thread.sleep(50);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a21ce43..2eeedb0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -954,17 +954,9 @@
{
WindowMessage windowMsg = (WindowMessage) msg;
sendWindow.release(windowMsg.getNumAck());
- } else
+ }
+ else
{
- if (msg instanceof UpdateMessage)
- {
- rcvWindow--;
- if (rcvWindow < halfRcvWindow)
- {
- session.publish(new WindowMessage(halfRcvWindow));
- rcvWindow += halfRcvWindow;
- }
- }
return msg;
}
} catch (SocketTimeoutException e)
@@ -988,6 +980,30 @@
}
/**
+ * This method allows to do the necessary computing for the window
+ * management after treatment by the worker threads.
+ *
+ * This should be called once the replay thread have done their job
+ * and the window can be open again.
+ */
+ public synchronized void updateWindowAfterReplay()
+ {
+ try
+ {
+ rcvWindow--;
+ if (rcvWindow < halfRcvWindow)
+ {
+ session.publish(new WindowMessage(halfRcvWindow));
+ rcvWindow += halfRcvWindow;
+ }
+ } catch (IOException e)
+ {
+ // Any error on the socket will be handled by the thread calling receive()
+ // just ignore.
+ }
+ }
+
+ /**
* stop the server.
*/
public void stop()
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 0800236..ad68300 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,91 +869,88 @@
while (update == null)
{
InitializeRequestMessage initMsg = null;
- synchronized (broker)
+ ReplicationMessage msg;
+ try
{
- ReplicationMessage msg;
- try
+ msg = broker.receive();
+ if (msg == null)
{
- msg = broker.receive();
- if (msg == null)
- {
- // The server is in the shutdown process
- return null;
- }
-
- if (debugEnabled())
- if (!(msg instanceof HeartbeatMessage))
- TRACER.debugVerbose("Message received <" + msg + ">");
-
- if (msg instanceof AckMessage)
- {
- AckMessage ack = (AckMessage) msg;
- receiveAck(ack);
- }
- else if (msg instanceof InitializeRequestMessage)
- {
- // Another server requests us to provide entries
- // for a total update
- initMsg = (InitializeRequestMessage)msg;
- }
- else if (msg instanceof InitializeTargetMessage)
- {
- // Another server is exporting its entries to us
- InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
-
- try
- {
- // This must be done while we are still holding the
- // broker lock because we are now going to receive a
- // bunch of entries from the remote server and we
- // want the import thread to catch them and
- // not the ListenerThread.
- initialize(importMsg);
- }
- catch(DirectoryException de)
- {
- // Returns an error message to notify the sender
- ErrorMessage errorMsg =
- new ErrorMessage(importMsg.getsenderID(),
- de.getMessageObject());
- MessageBuilder mb = new MessageBuilder();
- mb.append(de.getMessageObject());
- TRACER.debugInfo(Message.toString(mb.toMessage()));
- broker.publish(errorMsg);
- }
- }
- else if (msg instanceof ErrorMessage)
- {
- if (ieContext != null)
- {
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find any import source.
- abandonImportExport((ErrorMessage)msg);
- }
- else
- {
- /* We can receive an error message from the replication server
- * in the following cases :
- * - we connected with an incorrect generation id
- */
- ErrorMessage errorMsg = (ErrorMessage)msg;
- logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
- }
- }
- else if (msg instanceof UpdateMessage)
- {
- update = (UpdateMessage) msg;
- receiveUpdate(update);
- }
- }
- catch (SocketTimeoutException e)
- {
- // just retry
+ // The server is in the shutdown process
+ return null;
}
+
+ if (debugEnabled())
+ if (!(msg instanceof HeartbeatMessage))
+ TRACER.debugVerbose("Message received <" + msg + ">");
+
+ if (msg instanceof AckMessage)
+ {
+ AckMessage ack = (AckMessage) msg;
+ receiveAck(ack);
+ }
+ else if (msg instanceof InitializeRequestMessage)
+ {
+ // Another server requests us to provide entries
+ // for a total update
+ initMsg = (InitializeRequestMessage)msg;
+ }
+ else if (msg instanceof InitializeTargetMessage)
+ {
+ // Another server is exporting its entries to us
+ InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+
+ try
+ {
+ // This must be done while we are still holding the
+ // broker lock because we are now going to receive a
+ // bunch of entries from the remote server and we
+ // want the import thread to catch them and
+ // not the ListenerThread.
+ initialize(importMsg);
+ }
+ catch(DirectoryException de)
+ {
+ // Returns an error message to notify the sender
+ ErrorMessage errorMsg =
+ new ErrorMessage(importMsg.getsenderID(),
+ de.getMessageObject());
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(de.getMessageObject());
+ TRACER.debugInfo(Message.toString(mb.toMessage()));
+ broker.publish(errorMsg);
+ }
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ if (ieContext != null)
+ {
+ // This is an error termination for the 2 following cases :
+ // - either during an export
+ // - or before an import really started
+ // For example, when we publish a request and the
+ // replicationServer did not find any import source.
+ abandonImportExport((ErrorMessage)msg);
+ }
+ else
+ {
+ /* We can receive an error message from the replication server
+ * in the following cases :
+ * - we connected with an incorrect generation id
+ */
+ ErrorMessage errorMsg = (ErrorMessage)msg;
+ logError(ERR_ERROR_MSG_RECEIVED.get(
+ errorMsg.getDetails()));
+ }
+ }
+ else if (msg instanceof UpdateMessage)
+ {
+ update = (UpdateMessage) msg;
+ receiveUpdate(update);
+ }
+ }
+ catch (SocketTimeoutException e)
+ {
+ // just retry
}
// Test if we have received and export request message and
// if that's the case handle it now.
@@ -1259,7 +1256,10 @@
shutdown = true;
// Stop the listener thread
- listenerThread.shutdown();
+ if (listenerThread != null)
+ {
+ listenerThread.shutdown();
+ }
synchronized (this)
{
@@ -1274,7 +1274,8 @@
broker.stop();
// Wait for the listener thread to stop
- listenerThread.waitForShutdown();
+ if (listenerThread != null)
+ listenerThread.waitForShutdown();
// wait for completion of the persistentServerState thread.
try
@@ -1441,6 +1442,7 @@
{
if (!dependency)
{
+ broker.updateWindowAfterReplay();
if (msg.isAssured())
ack(msg.getChangeNumber());
incProcessedUpdates();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
index 2d673c2..6be17f5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -155,6 +155,7 @@
while (true)
{
broker.receive();
+ broker.updateWindowAfterReplay();
rcvCount++;
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 8186d82..1494068 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -248,6 +248,7 @@
"uid");
server1.publish(msg);
ReplicationMessage msg2 = server2.receive();
+ server2.updateWindowAfterReplay();
if (msg2 instanceof DeleteMsg)
{
DeleteMsg del = (DeleteMsg) msg2;
@@ -263,6 +264,7 @@
msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
server1.publish(msg);
msg2 = server2.receive();
+ server2.updateWindowAfterReplay();
if (msg2 instanceof DeleteMsg)
{
DeleteMsg del = (DeleteMsg) msg2;
@@ -280,6 +282,7 @@
"other-uid");
server2.publish(msg);
msg2 = server1.receive();
+ server1.updateWindowAfterReplay();
if (msg2 instanceof DeleteMsg)
{
DeleteMsg del = (DeleteMsg) msg2;
@@ -295,6 +298,7 @@
msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
server2.publish(msg);
msg2 = server1.receive();
+ server1.updateWindowAfterReplay();
if (msg2 instanceof DeleteMsg)
{
DeleteMsg del = (DeleteMsg) msg2;
@@ -329,6 +333,7 @@
100, replicationServerPort, 1000, false);
ReplicationMessage msg2 = broker.receive();
+ broker.updateWindowAfterReplay();
if (!(msg2 instanceof DeleteMsg))
fail("ReplicationServer basic transmission failed:" + msg2);
else
@@ -367,6 +372,7 @@
100, replicationServerPort, 5000, state);
ReplicationMessage msg2 = broker.receive();
+ broker.updateWindowAfterReplay();
if (!(msg2 instanceof DeleteMsg))
{
fail("ReplicationServer basic transmission failed:" + msg2);
@@ -776,6 +782,7 @@
msg2 = broker2.receive();
if (msg2 == null)
break;
+ broker2.updateWindowAfterReplay();
}
catch (Exception e)
{
@@ -982,6 +989,7 @@
while (true)
{
ReplicationMessage msg = broker.receive();
+ broker.updateWindowAfterReplay();
if (msg == null)
break;
}
--
Gitblit v1.10.0