opends/src/messages/src/org/opends/messages/MessageDescriptor.java
@@ -963,7 +963,10 @@ * @return int ordinal value */ public int getOrdinal() { return this.ordinal; if (this.ordinal == null) return 0; else return this.ordinal; } /** opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -799,9 +799,10 @@ if (update == null) { synchronized (broker) while (update == null) { while (update == null) InitializeRequestMessage initMsg = null; synchronized (broker) { ReplicationMessage msg; try @@ -822,32 +823,27 @@ { // Another server requests us to provide entries // for a total update InitializeRequestMessage initMsg = (InitializeRequestMessage) msg; try { initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), null); } catch(DirectoryException de) { // An error message has been sent to the peer // Nothing more to do locally } initMsg = (InitializeRequestMessage) msg; } else if (msg instanceof InitializeTargetMessage) { // Another server is exporting its entries to us InitializeTargetMessage initMsg = (InitializeTargetMessage) msg; InitializeTargetMessage importMsg = (InitializeTargetMessage) msg; try { importBackend(initMsg); // This must be done while we are still holding the // broker lock because we are now going to receive a // bunch of entries from the remote server and we // want the import thread to catch them and // not the ListenerThread. importBackend(importMsg); } catch(DirectoryException de) { // Return an error message to notify the sender ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), new ErrorMessage(importMsg.getsenderID(), de.getMessageObject()); MessageBuilder mb = new MessageBuilder(); mb.append(de.getMessageObject()); @@ -880,6 +876,28 @@ // just retry } } // Test if we have received and export request message and // if that's the case handle it now. // This must be done outside of the portion of code protected // by the broker lock so that we keep receiveing update // when we are doing and export and so that a possible // closure of the socket happening when we are publishing the // entries to the remote can be handled by the other // ListenerThread when they call this method and therefore the // broker.receive() method. if (initMsg != null) { try { initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), null); } catch(DirectoryException de) { // An error message has been sent to the peer // Nothing more to do locally } } } } return update; opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -583,8 +583,8 @@ { // TODO Handle error properly (sender timeout in addition) /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. * An error happened trying the send back an error to this server. * Log an error and close the connection to the sender server. */ MessageBuilder mb2 = new MessageBuilder(); mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); @@ -604,8 +604,9 @@ catch(IOException ioe) { /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. * An error happened trying the send a routabled message * to its destination server. * Send back an error to the originator of the message. */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString())); @@ -613,7 +614,24 @@ mb.append(" "); mb.append(msg.getClass().getCanonicalName()); logError(mb.toMessage()); senderHandler.shutdown(); MessageBuilder mb1 = new MessageBuilder(); mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get()); mb1.append("serverID:" + msg.getDestination()); ErrorMessage errMsg = new ErrorMessage( msg.getsenderID(), mb1.toMessage()); try { senderHandler.send(errMsg); } catch(IOException ioe1) { // an error happened on the sender session trying to recover // from an error on the receiver session. // We don't have much solution left beside closing the sessions. senderHandler.shutdown(); targetHandler.shutdown(); } // TODO Handle error properly (sender timeout in addition) } } opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -25,6 +25,9 @@ * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.tasks; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.core.DirectoryServer.getAttributeType; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; @@ -32,7 +35,6 @@ import java.util.List; import org.opends.messages.MessageBuilder; import org.opends.messages.TaskMessages; import org.opends.server.backends.task.Task; import org.opends.server.backends.task.TaskState; @@ -73,6 +75,8 @@ // completed long left = 0; private Message initTaskError = null; /** * {@inheritDoc} */ @@ -160,6 +164,9 @@ initState = TaskState.STOPPED_BY_ERROR; } if (initTaskError != null) logError(initTaskError); if (debugEnabled()) { TRACER.debugInfo("InitializeTask is ending with state:%s", @@ -181,7 +188,7 @@ { if (de != null) { logError(de.getMessageObject()); initTaskError = de.getMessageObject(); } if (debugEnabled()) { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -111,7 +111,6 @@ private static final DebugTracer TRACER = getTracer(); private static final int WINDOW_SIZE = 10; private static final int CHANGELOG_QUEUE_SIZE = 100; /** * A "person" entry @@ -501,15 +500,13 @@ // Check that the left counter. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true); String leftString = resultEntry.getAttributeValue(taskStateType, resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); // Check that the total counter. taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true); String totalString = resultEntry.getAttributeValue(taskStateType, resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); } catch(Exception e) @@ -544,7 +541,7 @@ log(logMessages.get(0)); log(expectedMessage.toString()); assertTrue(logMessages.get(0).indexOf( expectedMessage.toString())>0); expectedMessage.toString())>=0); } } } @@ -1387,7 +1384,7 @@ String testCase = "InitializeNoSource"; log("Starting "+testCase); // Start SS // Start Replication Server changelog1 = createChangelogServer(changelog1ID); // Creates config to synchronize suffix