mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
11.42.2008 2ec1e20dacc4606317fc9e38890117df638a1fff
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -56,6 +56,7 @@
  private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
  private boolean shutdown = false;
  private boolean done = false;
  private static int count = 0;
  /**
   * Constructor for the ReplayThread.
@@ -64,7 +65,7 @@
   */
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
  {
     super("Replication Replay thread");
     super("Replication Replay thread " + count++);
     this.updateToReplayQueue = updateToReplayQueue;
  }
@@ -130,7 +131,7 @@
  {
    try
    {
      while (done == false)
      while ((done == false) && (this.isAlive()))
      {
        Thread.sleep(50);
      }
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -954,17 +954,9 @@
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        } else
        }
        else
        {
          if (msg instanceof UpdateMessage)
          {
            rcvWindow--;
            if (rcvWindow < halfRcvWindow)
            {
              session.publish(new WindowMessage(halfRcvWindow));
              rcvWindow += halfRcvWindow;
            }
          }
          return msg;
        }
      } catch (SocketTimeoutException e)
@@ -988,6 +980,30 @@
  }
  /**
   * This method allows to do the necessary computing for the window
   * management after treatment by the worker threads.
   *
   * This should be called once the replay thread have done their job
   * and the window can be open again.
   */
  public synchronized void updateWindowAfterReplay()
  {
    try
    {
      rcvWindow--;
      if (rcvWindow < halfRcvWindow)
      {
        session.publish(new WindowMessage(halfRcvWindow));
        rcvWindow += halfRcvWindow;
      }
    } catch (IOException e)
    {
      // Any error on the socket will be handled by the thread calling receive()
      // just ignore.
    }
  }
  /**
   * stop the server.
   */
  public void stop()
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,91 +869,88 @@
    while (update == null)
    {
      InitializeRequestMessage initMsg = null;
      synchronized (broker)
      ReplicationMessage msg;
      try
      {
        ReplicationMessage msg;
        try
        msg = broker.receive();
        if (msg == null)
        {
          msg = broker.receive();
          if (msg == null)
          {
            // The server is in the shutdown process
            return null;
          }
          if (debugEnabled())
            if (!(msg instanceof HeartbeatMessage))
              TRACER.debugVerbose("Message received <" + msg + ">");
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
            receiveAck(ack);
            }
            else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
              initMsg = (InitializeRequestMessage)msg;
            }
            else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
            try
            {
              // This must be done while we are still holding the
              // broker lock because we are now going to receive a
              // bunch of entries from the remote server and we
              // want the import thread to catch them and
              // not the ListenerThread.
              initialize(importMsg);
              }
              catch(DirectoryException de)
            {
              // Returns an error message to notify the sender
              ErrorMessage errorMsg =
                new ErrorMessage(importMsg.getsenderID(),
                de.getMessageObject());
              MessageBuilder mb = new MessageBuilder();
              mb.append(de.getMessageObject());
              TRACER.debugInfo(Message.toString(mb.toMessage()));
              broker.publish(errorMsg);
            }
            }
            else if (msg instanceof ErrorMessage)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
              else
            {
              /* We can receive an error message from the replication server
               * in the following cases :
               * - we connected with an incorrect generation id
               */
                ErrorMessage errorMsg = (ErrorMessage)msg;
              logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
            }
            }
            else if (msg instanceof UpdateMessage)
          {
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          }
          catch (SocketTimeoutException e)
        {
        // just retry
          // The server is in the shutdown process
          return null;
        }
        if (debugEnabled())
          if (!(msg instanceof HeartbeatMessage))
            TRACER.debugVerbose("Message received <" + msg + ">");
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
          receiveAck(ack);
        }
        else if (msg instanceof InitializeRequestMessage)
        {
          // Another server requests us to provide entries
          // for a total update
          initMsg = (InitializeRequestMessage)msg;
        }
        else if (msg instanceof InitializeTargetMessage)
        {
          // Another server is exporting its entries to us
          InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
          try
          {
            // This must be done while we are still holding the
            // broker lock because we are now going to receive a
            // bunch of entries from the remote server and we
            // want the import thread to catch them and
            // not the ListenerThread.
            initialize(importMsg);
          }
          catch(DirectoryException de)
          {
            // Returns an error message to notify the sender
            ErrorMessage errorMsg =
              new ErrorMessage(importMsg.getsenderID(),
                  de.getMessageObject());
            MessageBuilder mb = new MessageBuilder();
            mb.append(de.getMessageObject());
            TRACER.debugInfo(Message.toString(mb.toMessage()));
            broker.publish(errorMsg);
          }
        }
        else if (msg instanceof ErrorMessage)
        {
          if (ieContext != null)
          {
            // This is an error termination for the 2 following cases :
            // - either during an export
            // - or before an import really started
            //   For example, when we publish a request and the
            //  replicationServer did not find any import source.
            abandonImportExport((ErrorMessage)msg);
          }
          else
          {
            /* We can receive an error message from the replication server
             * in the following cases :
             * - we connected with an incorrect generation id
             */
            ErrorMessage errorMsg = (ErrorMessage)msg;
            logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
          }
        }
        else if (msg instanceof UpdateMessage)
        {
          update = (UpdateMessage) msg;
          receiveUpdate(update);
        }
      }
      catch (SocketTimeoutException e)
      {
        // just retry
      }
      // Test if we have received and export request message and
      // if that's the case handle it now.
@@ -1259,7 +1256,10 @@
    shutdown = true;
    // Stop the listener thread
    listenerThread.shutdown();
    if (listenerThread != null)
    {
      listenerThread.shutdown();
    }
    synchronized (this)
    {
@@ -1274,7 +1274,8 @@
    broker.stop();
    // Wait for the listener thread to stop
    listenerThread.waitForShutdown();
    if (listenerThread != null)
      listenerThread.waitForShutdown();
    // wait for completion of the persistentServerState thread.
    try
@@ -1441,6 +1442,7 @@
      {
        if (!dependency)
        {
          broker.updateWindowAfterReplay();
          if (msg.isAssured())
            ack(msg.getChangeNumber());
          incProcessedUpdates();
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -155,6 +155,7 @@
        while (true)
        {
          broker.receive();
          broker.updateWindowAfterReplay();
          rcvCount++;
        }
      }
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -248,6 +248,7 @@
                      "uid");
      server1.publish(msg);
      ReplicationMessage msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -263,6 +264,7 @@
      msg = new DeleteMsg("o=test", secondChangeNumberServer1, "uid");
      server1.publish(msg);
      msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -280,6 +282,7 @@
                      "other-uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -295,6 +298,7 @@
      msg = new DeleteMsg("o=test", secondChangeNumberServer2, "uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      if (msg2 instanceof DeleteMsg)
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -329,6 +333,7 @@
                             100, replicationServerPort, 1000, false);
      ReplicationMessage msg2 = broker.receive();
      broker.updateWindowAfterReplay();
      if (!(msg2 instanceof DeleteMsg))
        fail("ReplicationServer basic transmission failed:" + msg2);
      else
@@ -367,6 +372,7 @@
                             100, replicationServerPort, 5000, state);
      ReplicationMessage msg2 = broker.receive();
      broker.updateWindowAfterReplay();
      if (!(msg2 instanceof DeleteMsg))
      {
        fail("ReplicationServer basic transmission failed:" + msg2);
@@ -776,6 +782,7 @@
            msg2 = broker2.receive();
            if (msg2 == null)
              break;
            broker2.updateWindowAfterReplay();
          }
          catch (Exception e)
          {
@@ -982,6 +989,7 @@
        while (true)
        {
          ReplicationMessage msg = broker.receive();
          broker.updateWindowAfterReplay();
          if (msg == null)
            break;
          }