From 0f9ee85fd0b36220ef6a3ee8d2b9f5f6f02b26bd Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 11 Feb 2008 13:42:42 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 170 ++++++++++++++++++++++++++++----------------------------
1 files changed, 86 insertions(+), 84 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 0800236..ad68300 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,91 +869,88 @@
while (update == null)
{
InitializeRequestMessage initMsg = null;
- synchronized (broker)
+ ReplicationMessage msg;
+ try
{
- ReplicationMessage msg;
- try
+ msg = broker.receive();
+ if (msg == null)
{
- msg = broker.receive();
- if (msg == null)
- {
- // The server is in the shutdown process
- return null;
- }
-
- if (debugEnabled())
- if (!(msg instanceof HeartbeatMessage))
- TRACER.debugVerbose("Message received <" + msg + ">");
-
- if (msg instanceof AckMessage)
- {
- AckMessage ack = (AckMessage) msg;
- receiveAck(ack);
- }
- else if (msg instanceof InitializeRequestMessage)
- {
- // Another server requests us to provide entries
- // for a total update
- initMsg = (InitializeRequestMessage)msg;
- }
- else if (msg instanceof InitializeTargetMessage)
- {
- // Another server is exporting its entries to us
- InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
-
- try
- {
- // This must be done while we are still holding the
- // broker lock because we are now going to receive a
- // bunch of entries from the remote server and we
- // want the import thread to catch them and
- // not the ListenerThread.
- initialize(importMsg);
- }
- catch(DirectoryException de)
- {
- // Returns an error message to notify the sender
- ErrorMessage errorMsg =
- new ErrorMessage(importMsg.getsenderID(),
- de.getMessageObject());
- MessageBuilder mb = new MessageBuilder();
- mb.append(de.getMessageObject());
- TRACER.debugInfo(Message.toString(mb.toMessage()));
- broker.publish(errorMsg);
- }
- }
- else if (msg instanceof ErrorMessage)
- {
- if (ieContext != null)
- {
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find any import source.
- abandonImportExport((ErrorMessage)msg);
- }
- else
- {
- /* We can receive an error message from the replication server
- * in the following cases :
- * - we connected with an incorrect generation id
- */
- ErrorMessage errorMsg = (ErrorMessage)msg;
- logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
- }
- }
- else if (msg instanceof UpdateMessage)
- {
- update = (UpdateMessage) msg;
- receiveUpdate(update);
- }
- }
- catch (SocketTimeoutException e)
- {
- // just retry
+ // The server is in the shutdown process
+ return null;
}
+
+ if (debugEnabled())
+ if (!(msg instanceof HeartbeatMessage))
+ TRACER.debugVerbose("Message received <" + msg + ">");
+
+ if (msg instanceof AckMessage)
+ {
+ AckMessage ack = (AckMessage) msg;
+ receiveAck(ack);
+ }
+ else if (msg instanceof InitializeRequestMessage)
+ {
+ // Another server requests us to provide entries
+ // for a total update
+ initMsg = (InitializeRequestMessage)msg;
+ }
+ else if (msg instanceof InitializeTargetMessage)
+ {
+ // Another server is exporting its entries to us
+ InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+
+ try
+ {
+ // This must be done while we are still holding the
+ // broker lock because we are now going to receive a
+ // bunch of entries from the remote server and we
+ // want the import thread to catch them and
+ // not the ListenerThread.
+ initialize(importMsg);
+ }
+ catch(DirectoryException de)
+ {
+ // Returns an error message to notify the sender
+ ErrorMessage errorMsg =
+ new ErrorMessage(importMsg.getsenderID(),
+ de.getMessageObject());
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(de.getMessageObject());
+ TRACER.debugInfo(Message.toString(mb.toMessage()));
+ broker.publish(errorMsg);
+ }
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ if (ieContext != null)
+ {
+ // This is an error termination for the 2 following cases :
+ // - either during an export
+ // - or before an import really started
+ // For example, when we publish a request and the
+ // replicationServer did not find any import source.
+ abandonImportExport((ErrorMessage)msg);
+ }
+ else
+ {
+ /* We can receive an error message from the replication server
+ * in the following cases :
+ * - we connected with an incorrect generation id
+ */
+ ErrorMessage errorMsg = (ErrorMessage)msg;
+ logError(ERR_ERROR_MSG_RECEIVED.get(
+ errorMsg.getDetails()));
+ }
+ }
+ else if (msg instanceof UpdateMessage)
+ {
+ update = (UpdateMessage) msg;
+ receiveUpdate(update);
+ }
+ }
+ catch (SocketTimeoutException e)
+ {
+ // just retry
}
// Test if we have received and export request message and
// if that's the case handle it now.
@@ -1259,7 +1256,10 @@
shutdown = true;
// Stop the listener thread
- listenerThread.shutdown();
+ if (listenerThread != null)
+ {
+ listenerThread.shutdown();
+ }
synchronized (this)
{
@@ -1274,7 +1274,8 @@
broker.stop();
// Wait for the listener thread to stop
- listenerThread.waitForShutdown();
+ if (listenerThread != null)
+ listenerThread.waitForShutdown();
// wait for completion of the persistentServerState thread.
try
@@ -1441,6 +1442,7 @@
{
if (!dependency)
{
+ broker.updateWindowAfterReplay();
if (msg.isAssured())
ack(msg.getChangeNumber());
incProcessedUpdates();
--
Gitblit v1.10.0