mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
11.42.2008 2ec1e20dacc4606317fc9e38890117df638a1fff

With the changes done in revision 3695
(issue 1288 : replication should not create 10 threads for each replication domain)
the window mechanism from the Replication Server to the LDAP server is not working
anymore because the window ACK is sent as soon as the LDAP server has read the message from
the socket while it should be sent after replaying the operation.

This can cause out of memory problems and bad monitoring.

The fix is to move the response using WindowMessage from the ListenerThread
to the ReplayThread at the end of the message processing.
5 files modified
58 ■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java 5 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 36 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 8 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java 1 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 8 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -56,6 +56,7 @@
  private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
  private boolean shutdown = false;
  private boolean done = false;
  private static int count = 0;
  /**
   * Constructor for the ReplayThread.
@@ -64,7 +65,7 @@
   */
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
  {
     super("Replication Replay thread");
     super("Replication Replay thread " + count++);
     this.updateToReplayQueue = updateToReplayQueue;
  }
@@ -130,7 +131,7 @@
  {
    try
    {
      while (done == false)
      while ((done == false) && (this.isAlive()))
      {
        Thread.sleep(50);
      }
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -954,17 +954,9 @@
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        } else
        {
          if (msg instanceof UpdateMessage)
          {
            rcvWindow--;
            if (rcvWindow < halfRcvWindow)
            {
              session.publish(new WindowMessage(halfRcvWindow));
              rcvWindow += halfRcvWindow;
            }
          }
        else
        {
          return msg;
        }
      } catch (SocketTimeoutException e)
@@ -988,6 +980,30 @@
  }
  /**
   * This method allows to do the necessary computing for the window
   * management after treatment by the worker threads.
   *
   * This should be called once the replay thread have done their job
   * and the window can be open again.
   */
  public synchronized void updateWindowAfterReplay()
  {
    try
    {
      rcvWindow--;
      if (rcvWindow < halfRcvWindow)
      {
        session.publish(new WindowMessage(halfRcvWindow));
        rcvWindow += halfRcvWindow;
      }
    } catch (IOException e)
    {
      // Any error on the socket will be handled by the thread calling receive()
      // just ignore.
    }
  }
  /**
   * stop the server.
   */
  public void stop()
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,8 +869,6 @@
    while (update == null)
    {
      InitializeRequestMessage initMsg = null;
      synchronized (broker)
      {
        ReplicationMessage msg;
        try
        {
@@ -954,7 +952,6 @@
        {
        // just retry
        }
      }
      // Test if we have received and export request message and
      // if that's the case handle it now.
      // This must be done outside of the portion of code protected
@@ -1259,7 +1256,10 @@
    shutdown = true;
    // Stop the listener thread
    if (listenerThread != null)
    {
    listenerThread.shutdown();
    }
    synchronized (this)
    {
@@ -1274,6 +1274,7 @@
    broker.stop();
    // Wait for the listener thread to stop
    if (listenerThread != null)
    listenerThread.waitForShutdown();
    // wait for completion of the persistentServerState thread.
@@ -1441,6 +1442,7 @@
      {
        if (!dependency)
        {
          broker.updateWindowAfterReplay();
          if (msg.isAssured())
            ack(msg.getChangeNumber());
          incProcessedUpdates();
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -155,6 +155,7 @@
        while (true)
        {
          broker.receive();
          broker.updateWindowAfterReplay();
          rcvCount++;
        }
      }
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -248,6 +248,7 @@
                      "uid");
      server1.publish(msg);
      ReplicationMessage msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -263,6 +264,7 @@
      msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
      server1.publish(msg);
      msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -280,6 +282,7 @@
                      "other-uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -295,6 +298,7 @@
      msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -329,6 +333,7 @@
                             100, replicationServerPort, 1000, false);
      ReplicationMessage msg2 = broker.receive();
      broker.updateWindowAfterReplay();
      if (!(msg2 instanceof DeleteMsg))
        fail("ReplicationServer basic transmission failed:" + msg2);
      else
@@ -367,6 +372,7 @@
                             100, replicationServerPort, 5000, state);
      ReplicationMessage msg2 = broker.receive();
      broker.updateWindowAfterReplay();
      if (!(msg2 instanceof DeleteMsg))
      {
        fail("ReplicationServer basic transmission failed:" + msg2);
@@ -776,6 +782,7 @@
            msg2 = broker2.receive();
            if (msg2 == null)
              break;
            broker2.updateWindowAfterReplay();
          }
          catch (Exception e)
          {
@@ -982,6 +989,7 @@
        while (true)
        {
          ReplicationMessage msg = broker.receive();
          broker.updateWindowAfterReplay();
          if (msg == null)
            break;
          }