From c6c3de416bcc406346299a860905c9e71870a4ab Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 27 Aug 2007 11:58:47 +0000
Subject: [PATCH] complement for issue 2097 : total update fails sending a Message to ReplicationCache
---
opends/src/server/org/opends/server/replication/server/ReplicationCache.java | 28 +++++++++++--
opends/src/server/org/opends/server/tasks/InitializeTask.java | 11 ++++-
opends/src/messages/src/org/opends/messages/MessageDescriptor.java | 5 ++
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 50 +++++++++++++++++--------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 11 ++---
5 files changed, 74 insertions(+), 31 deletions(-)
diff --git a/opends/src/messages/src/org/opends/messages/MessageDescriptor.java b/opends/src/messages/src/org/opends/messages/MessageDescriptor.java
index aa6365f..9e39e19 100644
--- a/opends/src/messages/src/org/opends/messages/MessageDescriptor.java
+++ b/opends/src/messages/src/org/opends/messages/MessageDescriptor.java
@@ -963,7 +963,10 @@
* @return int ordinal value
*/
public int getOrdinal() {
- return this.ordinal;
+ if (this.ordinal == null)
+ return 0;
+ else
+ return this.ordinal;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 5ed95cb..05233d3 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -799,9 +799,10 @@
if (update == null)
{
- synchronized (broker)
+ while (update == null)
{
- while (update == null)
+ InitializeRequestMessage initMsg = null;
+ synchronized (broker)
{
ReplicationMessage msg;
try
@@ -822,32 +823,27 @@
{
// Another server requests us to provide entries
// for a total update
- InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
- try
- {
- initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
- null);
- }
- catch(DirectoryException de)
- {
- // An error message has been sent to the peer
- // Nothing more to do locally
- }
+ initMsg = (InitializeRequestMessage) msg;
}
else if (msg instanceof InitializeTargetMessage)
{
// Another server is exporting its entries to us
- InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
+ InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
try
{
- importBackend(initMsg);
+ // This must be done while we are still holding the
+ // broker lock because we are now going to receive a
+ // bunch of entries from the remote server and we
+ // want the import thread to catch them and
+ // not the ListenerThread.
+ importBackend(importMsg);
}
catch(DirectoryException de)
{
// Return an error message to notify the sender
ErrorMessage errorMsg =
- new ErrorMessage(initMsg.getsenderID(),
+ new ErrorMessage(importMsg.getsenderID(),
de.getMessageObject());
MessageBuilder mb = new MessageBuilder();
mb.append(de.getMessageObject());
@@ -880,6 +876,28 @@
// just retry
}
}
+ // Test if we have received and export request message and
+ // if that's the case handle it now.
+ // This must be done outside of the portion of code protected
+ // by the broker lock so that we keep receiveing update
+ // when we are doing and export and so that a possible
+ // closure of the socket happening when we are publishing the
+ // entries to the remote can be handled by the other
+ // ListenerThread when they call this method and therefore the
+ // broker.receive() method.
+ if (initMsg != null)
+ {
+ try
+ {
+ initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+ null);
+ }
+ catch(DirectoryException de)
+ {
+ // An error message has been sent to the peer
+ // Nothing more to do locally
+ }
+ }
}
}
return update;
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index ef31ab3..d618cf8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -583,8 +583,8 @@
{
// TODO Handle error properly (sender timeout in addition)
/*
- * An error happened trying the send back an ack to this server.
- * Log an error and close the connection to this server.
+ * An error happened trying the send back an error to this server.
+ * Log an error and close the connection to the sender server.
*/
MessageBuilder mb2 = new MessageBuilder();
mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
@@ -604,8 +604,9 @@
catch(IOException ioe)
{
/*
- * An error happened trying the send back an ack to this server.
- * Log an error and close the connection to this server.
+ * An error happened trying the send a routabled message
+ * to its destination server.
+ * Send back an error to the originator of the message.
*/
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString()));
@@ -613,7 +614,24 @@
mb.append(" ");
mb.append(msg.getClass().getCanonicalName());
logError(mb.toMessage());
- senderHandler.shutdown();
+
+ MessageBuilder mb1 = new MessageBuilder();
+ mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
+ mb1.append("serverID:" + msg.getDestination());
+ ErrorMessage errMsg = new ErrorMessage(
+ msg.getsenderID(), mb1.toMessage());
+ try
+ {
+ senderHandler.send(errMsg);
+ }
+ catch(IOException ioe1)
+ {
+ // an error happened on the sender session trying to recover
+ // from an error on the receiver session.
+ // We don't have much solution left beside closing the sessions.
+ senderHandler.shutdown();
+ targetHandler.shutdown();
+ }
// TODO Handle error properly (sender timeout in addition)
}
}
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTask.java b/opends/src/server/org/opends/server/tasks/InitializeTask.java
index 0524545..05afb59 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -25,6 +25,9 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.tasks;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
@@ -32,7 +35,6 @@
import java.util.List;
-import org.opends.messages.MessageBuilder;
import org.opends.messages.TaskMessages;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
@@ -73,6 +75,8 @@
// completed
long left = 0;
+ private Message initTaskError = null;
+
/**
* {@inheritDoc}
*/
@@ -160,6 +164,9 @@
initState = TaskState.STOPPED_BY_ERROR;
}
+ if (initTaskError != null)
+ logError(initTaskError);
+
if (debugEnabled())
{
TRACER.debugInfo("InitializeTask is ending with state:%s",
@@ -181,7 +188,7 @@
{
if (de != null)
{
- logError(de.getMessageObject());
+ initTaskError = de.getMessageObject();
}
if (debugEnabled())
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index 0236f64..a119ab3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -111,7 +111,6 @@
private static final DebugTracer TRACER = getTracer();
private static final int WINDOW_SIZE = 10;
- private static final int CHANGELOG_QUEUE_SIZE = 100;
/**
* A "person" entry
@@ -501,15 +500,13 @@
// Check that the left counter.
AttributeType taskStateType =
DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
- String leftString =
- resultEntry.getAttributeValue(taskStateType,
+ resultEntry.getAttributeValue(taskStateType,
DirectoryStringSyntax.DECODER);
// Check that the total counter.
taskStateType =
DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
- String totalString =
- resultEntry.getAttributeValue(taskStateType,
+ resultEntry.getAttributeValue(taskStateType,
DirectoryStringSyntax.DECODER);
}
catch(Exception e)
@@ -544,7 +541,7 @@
log(logMessages.get(0));
log(expectedMessage.toString());
assertTrue(logMessages.get(0).indexOf(
- expectedMessage.toString())>0);
+ expectedMessage.toString())>=0);
}
}
}
@@ -1387,7 +1384,7 @@
String testCase = "InitializeNoSource";
log("Starting "+testCase);
- // Start SS
+ // Start Replication Server
changelog1 = createChangelogServer(changelog1ID);
// Creates config to synchronize suffix
--
Gitblit v1.10.0