mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
11.42.2008 0f9ee85fd0b36220ef6a3ee8d2b9f5f6f02b26bd

With the changes done in revision 3695
(issue 1288 : replication should not create 10 threads for each replication domain)
the window mechanism from the Replication Server to the LDAP server is not working
anymore because the window ACK is sent as soon as the LDAP server has read the message from
the socket while it should be sent after replaying the operation.

This can cause out of memory problems and bad monitoring.

The fix is to move the response using WindowMessage from the ListenerThread
to the ReplayThread at the end of the message processing.
5 files modified
220 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 36 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 170 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java 1 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -56,6 +56,7 @@
  private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
  private boolean shutdown = false;
  private boolean done = false;
  private static int count = 0;
  /**
   * Constructor for the ReplayThread.
@@ -64,7 +65,7 @@
   */
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
  {
     super("Replication Replay thread");
     super("Replication Replay thread " + count++);
     this.updateToReplayQueue = updateToReplayQueue;
  }
@@ -130,7 +131,7 @@
  {
    try
    {
      while (done == false)
      while ((done == false) && (this.isAlive()))
      {
        Thread.sleep(50);
      }
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -954,17 +954,9 @@
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        } else
        }
        else
        {
          if (msg instanceof UpdateMessage)
          {
            rcvWindow--;
            if (rcvWindow < halfRcvWindow)
            {
              session.publish(new WindowMessage(halfRcvWindow));
              rcvWindow += halfRcvWindow;
            }
          }
          return msg;
        }
      } catch (SocketTimeoutException e)
@@ -988,6 +980,30 @@
  }
  /**
   * This method allows to do the necessary computing for the window
   * management after treatment by the worker threads.
   *
   * This should be called once the replay thread have done their job
   * and the window can be open again.
   */
  public synchronized void updateWindowAfterReplay()
  {
    try
    {
      rcvWindow--;
      if (rcvWindow < halfRcvWindow)
      {
        session.publish(new WindowMessage(halfRcvWindow));
        rcvWindow += halfRcvWindow;
      }
    } catch (IOException e)
    {
      // Any error on the socket will be handled by the thread calling receive()
      // just ignore.
    }
  }
  /**
   * stop the server.
   */
  public void stop()
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,91 +869,88 @@
    while (update == null)
    {
      InitializeRequestMessage initMsg = null;
      synchronized (broker)
      ReplicationMessage msg;
      try
      {
        ReplicationMessage msg;
        try
        msg = broker.receive();
        if (msg == null)
        {
          msg = broker.receive();
          if (msg == null)
          {
            // The server is in the shutdown process
            return null;
          }
          if (debugEnabled())
            if (!(msg instanceof HeartbeatMessage))
              TRACER.debugVerbose("Message received <" + msg + ">");
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
            receiveAck(ack);
            }
            else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
              initMsg = (InitializeRequestMessage)msg;
            }
            else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
            try
            {
              // This must be done while we are still holding the
              // broker lock because we are now going to receive a
              // bunch of entries from the remote server and we
              // want the import thread to catch them and
              // not the ListenerThread.
              initialize(importMsg);
              }
              catch(DirectoryException de)
            {
              // Returns an error message to notify the sender
              ErrorMessage errorMsg =
                new ErrorMessage(importMsg.getsenderID(),
                de.getMessageObject());
              MessageBuilder mb = new MessageBuilder();
              mb.append(de.getMessageObject());
              TRACER.debugInfo(Message.toString(mb.toMessage()));
              broker.publish(errorMsg);
            }
            }
            else if (msg instanceof ErrorMessage)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
              else
            {
              /* We can receive an error message from the replication server
               * in the following cases :
               * - we connected with an incorrect generation id
               */
                ErrorMessage errorMsg = (ErrorMessage)msg;
              logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
            }
            }
            else if (msg instanceof UpdateMessage)
          {
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          }
          catch (SocketTimeoutException e)
        {
        // just retry
          // The server is in the shutdown process
          return null;
        }
        if (debugEnabled())
          if (!(msg instanceof HeartbeatMessage))
            TRACER.debugVerbose("Message received <" + msg + ">");
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
          receiveAck(ack);
        }
        else if (msg instanceof InitializeRequestMessage)
        {
          // Another server requests us to provide entries
          // for a total update
          initMsg = (InitializeRequestMessage)msg;
        }
        else if (msg instanceof InitializeTargetMessage)
        {
          // Another server is exporting its entries to us
          InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
          try
          {
            // This must be done while we are still holding the
            // broker lock because we are now going to receive a
            // bunch of entries from the remote server and we
            // want the import thread to catch them and
            // not the ListenerThread.
            initialize(importMsg);
          }
          catch(DirectoryException de)
          {
            // Returns an error message to notify the sender
            ErrorMessage errorMsg =
              new ErrorMessage(importMsg.getsenderID(),
                  de.getMessageObject());
            MessageBuilder mb = new MessageBuilder();
            mb.append(de.getMessageObject());
            TRACER.debugInfo(Message.toString(mb.toMessage()));
            broker.publish(errorMsg);
          }
        }
        else if (msg instanceof ErrorMessage)
        {
          if (ieContext != null)
          {
            // This is an error termination for the 2 following cases :
            // - either during an export
            // - or before an import really started
            //   For example, when we publish a request and the
            //  replicationServer did not find any import source.
            abandonImportExport((ErrorMessage)msg);
          }
          else
          {
            /* We can receive an error message from the replication server
             * in the following cases :
             * - we connected with an incorrect generation id
             */
            ErrorMessage errorMsg = (ErrorMessage)msg;
            logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
          }
        }
        else if (msg instanceof UpdateMessage)
        {
          update = (UpdateMessage) msg;
          receiveUpdate(update);
        }
      }
      catch (SocketTimeoutException e)
      {
        // just retry
      }
      // Test if we have received and export request message and
      // if that's the case handle it now.
@@ -1259,7 +1256,10 @@
    shutdown = true;
    // Stop the listener thread
    listenerThread.shutdown();
    if (listenerThread != null)
    {
      listenerThread.shutdown();
    }
    synchronized (this)
    {
@@ -1274,7 +1274,8 @@
    broker.stop();
    // Wait for the listener thread to stop
    listenerThread.waitForShutdown();
    if (listenerThread != null)
      listenerThread.waitForShutdown();
    // wait for completion of the persistentServerState thread.
    try
@@ -1441,6 +1442,7 @@
      {
        if (!dependency)
        {
          broker.updateWindowAfterReplay();
          if (msg.isAssured())
            ack(msg.getChangeNumber());
          incProcessedUpdates();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -155,6 +155,7 @@
        while (true)
        {
          broker.receive();
          broker.updateWindowAfterReplay();
          rcvCount++;
        }
      }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -248,6 +248,7 @@
                      "uid");
      server1.publish(msg);
      ReplicationMessage msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -263,6 +264,7 @@
      msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
      server1.publish(msg);
      msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -280,6 +282,7 @@
                      "other-uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -295,6 +298,7 @@
      msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -329,6 +333,7 @@
                             100, replicationServerPort, 1000, false);
      ReplicationMessage msg2 = broker.receive();
      broker.updateWindowAfterReplay();
      if (!(msg2 instanceof DeleteMsg))
        fail("ReplicationServer basic transmission failed:" + msg2);
      else
@@ -367,6 +372,7 @@
                             100, replicationServerPort, 5000, state);
      ReplicationMessage msg2 = broker.receive();
      broker.updateWindowAfterReplay();
      if (!(msg2 instanceof DeleteMsg))
      {
        fail("ReplicationServer basic transmission failed:" + msg2);
@@ -776,6 +782,7 @@
            msg2 = broker2.receive();
            if (msg2 == null)
              break;
            broker2.updateWindowAfterReplay();
          }
          catch (Exception e)
          {
@@ -982,6 +989,7 @@
        while (true)
        {
          ReplicationMessage msg = broker.receive();
          broker.updateWindowAfterReplay();
          if (msg == null)
            break;
          }