mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
27.58.2007 c6c3de416bcc406346299a860905c9e71870a4ab
complement for issue 2097 : total update fails sending a Message to ReplicationCache

When debugging 2097 I've noticed that during a replication total update,
if the updated server fails, the publisher thread on the other server hangs.

The hang happens because normally the publisher thread never tries to reconnect
after a protocol session failure but waits for the receiver threads to do the job but
here the publishing activity is done in a receiver thread that holding the session lock
and therefore prevent the other thread from doing there job.

The solution is to move this work outside of the lock.

I've tested this manually as it is difficult to automate.
5 files modified
105 ■■■■ changed files
opends/src/messages/src/org/opends/messages/MessageDescriptor.java 5 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 50 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationCache.java 28 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTask.java 11 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 11 ●●●●● patch | view | raw | blame | history
opends/src/messages/src/org/opends/messages/MessageDescriptor.java
@@ -963,7 +963,10 @@
   * @return int ordinal value
   */
  public int getOrdinal() {
    return this.ordinal;
    if (this.ordinal == null)
      return 0;
    else
      return this.ordinal;
  }
  /**
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -799,9 +799,10 @@
    if (update == null)
    {
      synchronized (broker)
      while (update == null)
      {
        while (update == null)
        InitializeRequestMessage initMsg = null;
        synchronized (broker)
        {
          ReplicationMessage msg;
          try
@@ -822,32 +823,27 @@
            {
              // Another server requests us to provide entries
              // for a total update
              InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
              try
              {
                initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                                 null);
              }
              catch(DirectoryException de)
              {
                // An error message has been sent to the peer
                // Nothing more to do locally
              }
              initMsg = (InitializeRequestMessage) msg;
            }
            else if (msg instanceof InitializeTargetMessage)
            {
              // Another server is exporting its entries to us
              InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
              InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
              try
              {
                importBackend(initMsg);
                // This must be done while we are still holding the
                // broker lock because we are now going to receive a
                // bunch of entries from the remote server and we
                // want the import thread to catch them and
                // not the ListenerThread.
                importBackend(importMsg);
              }
              catch(DirectoryException de)
              {
                // Return an error message to notify the sender
                ErrorMessage errorMsg =
                  new ErrorMessage(initMsg.getsenderID(),
                  new ErrorMessage(importMsg.getsenderID(),
                                   de.getMessageObject());
                MessageBuilder mb = new MessageBuilder();
                mb.append(de.getMessageObject());
@@ -880,6 +876,28 @@
            // just retry
          }
        }
        // Test if we have received and export request message and
        // if that's the case handle it now.
        // This must be done outside of the portion of code protected
        // by the broker lock so that we keep receiveing update
        // when we are doing and export and so that a possible
        // closure of the socket happening when we are publishing the
        // entries to the remote can be handled by the other
        // ListenerThread when they call this method and therefore the
        // broker.receive() method.
        if (initMsg != null)
        {
          try
          {
            initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                null);
          }
          catch(DirectoryException de)
          {
            // An error message has been sent to the peer
            // Nothing more to do locally
          }
        }
      }
    }
    return update;
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -583,8 +583,8 @@
      {
        // TODO Handle error properly (sender timeout in addition)
        /*
         * An error happened trying the send back an ack to this server.
         * Log an error and close the connection to this server.
         * An error happened trying the send back an error to this server.
         * Log an error and close the connection to the sender server.
         */
        MessageBuilder mb2 = new MessageBuilder();
        mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
@@ -604,8 +604,9 @@
        catch(IOException ioe)
        {
          /*
           * An error happened trying the send back an ack to this server.
           * Log an error and close the connection to this server.
           * An error happened trying the send a routabled message
           * to its destination server.
           * Send back an error to the originator of the message.
           */
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString()));
@@ -613,7 +614,24 @@
          mb.append(" ");
          mb.append(msg.getClass().getCanonicalName());
          logError(mb.toMessage());
          senderHandler.shutdown();
          MessageBuilder mb1 = new MessageBuilder();
          mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
          mb1.append("serverID:" + msg.getDestination());
          ErrorMessage errMsg = new ErrorMessage(
              msg.getsenderID(), mb1.toMessage());
          try
          {
            senderHandler.send(errMsg);
          }
          catch(IOException ioe1)
          {
            // an error happened on the sender session trying to recover
            // from an error on the receiver session.
            // We don't have much solution left beside closing the sessions.
            senderHandler.shutdown();
            targetHandler.shutdown();
          }
          // TODO Handle error properly (sender timeout in addition)
        }
      }
opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -25,6 +25,9 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.tasks;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.core.DirectoryServer.getAttributeType;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
@@ -32,7 +35,6 @@
import java.util.List;
import org.opends.messages.MessageBuilder;
import org.opends.messages.TaskMessages;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
@@ -73,6 +75,8 @@
  // completed
  long left = 0;
  private Message initTaskError = null;
  /**
   * {@inheritDoc}
   */
@@ -160,6 +164,9 @@
      initState = TaskState.STOPPED_BY_ERROR;
    }
    if (initTaskError != null)
      logError(initTaskError);
    if (debugEnabled())
    {
      TRACER.debugInfo("InitializeTask is ending with state:%s",
@@ -181,7 +188,7 @@
    {
      if (de != null)
      {
        logError(de.getMessageObject());
        initTaskError = de.getMessageObject();
      }
      if (debugEnabled())
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -111,7 +111,6 @@
  private static final DebugTracer TRACER = getTracer();
  private static final int WINDOW_SIZE = 10;
  private static final int CHANGELOG_QUEUE_SIZE = 100;
  /**
   * A "person" entry
@@ -501,15 +500,13 @@
          // Check that the left counter.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
          String leftString =
            resultEntry.getAttributeValue(taskStateType,
          resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          // Check that the total counter.
          taskStateType =
           DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
          String totalString =
           resultEntry.getAttributeValue(taskStateType,
          resultEntry.getAttributeValue(taskStateType,
               DirectoryStringSyntax.DECODER);
        }
        catch(Exception e)
@@ -544,7 +541,7 @@
            log(logMessages.get(0));
            log(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>0);
                expectedMessage.toString())>=0);
          }
        }
      }
@@ -1387,7 +1384,7 @@
    String testCase = "InitializeNoSource";
    log("Starting "+testCase);
    // Start SS
    // Start Replication Server
    changelog1 = createChangelogServer(changelog1ID);
    // Creates config to synchronize suffix