mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
11.42.2008 2ec1e20dacc4606317fc9e38890117df638a1fff
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -869,91 +869,88 @@
    while (update == null)
    {
      InitializeRequestMessage initMsg = null;
      synchronized (broker)
      ReplicationMessage msg;
      try
      {
        ReplicationMessage msg;
        try
        msg = broker.receive();
        if (msg == null)
        {
          msg = broker.receive();
          if (msg == null)
          {
            // The server is in the shutdown process
            return null;
          }
          if (debugEnabled())
            if (!(msg instanceof HeartbeatMessage))
              TRACER.debugVerbose("Message received <" + msg + ">");
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
            receiveAck(ack);
            }
            else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
              initMsg = (InitializeRequestMessage)msg;
            }
            else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
            try
            {
              // This must be done while we are still holding the
              // broker lock because we are now going to receive a
              // bunch of entries from the remote server and we
              // want the import thread to catch them and
              // not the ListenerThread.
              initialize(importMsg);
              }
              catch(DirectoryException de)
            {
              // Returns an error message to notify the sender
              ErrorMessage errorMsg =
                new ErrorMessage(importMsg.getsenderID(),
                de.getMessageObject());
              MessageBuilder mb = new MessageBuilder();
              mb.append(de.getMessageObject());
              TRACER.debugInfo(Message.toString(mb.toMessage()));
              broker.publish(errorMsg);
            }
            }
            else if (msg instanceof ErrorMessage)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
              else
            {
              /* We can receive an error message from the replication server
               * in the following cases :
               * - we connected with an incorrect generation id
               */
                ErrorMessage errorMsg = (ErrorMessage)msg;
              logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
            }
            }
            else if (msg instanceof UpdateMessage)
          {
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          }
          catch (SocketTimeoutException e)
        {
        // just retry
          // The server is in the shutdown process
          return null;
        }
        if (debugEnabled())
          if (!(msg instanceof HeartbeatMessage))
            TRACER.debugVerbose("Message received <" + msg + ">");
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
          receiveAck(ack);
        }
        else if (msg instanceof InitializeRequestMessage)
        {
          // Another server requests us to provide entries
          // for a total update
          initMsg = (InitializeRequestMessage)msg;
        }
        else if (msg instanceof InitializeTargetMessage)
        {
          // Another server is exporting its entries to us
          InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
          try
          {
            // This must be done while we are still holding the
            // broker lock because we are now going to receive a
            // bunch of entries from the remote server and we
            // want the import thread to catch them and
            // not the ListenerThread.
            initialize(importMsg);
          }
          catch(DirectoryException de)
          {
            // Returns an error message to notify the sender
            ErrorMessage errorMsg =
              new ErrorMessage(importMsg.getsenderID(),
                  de.getMessageObject());
            MessageBuilder mb = new MessageBuilder();
            mb.append(de.getMessageObject());
            TRACER.debugInfo(Message.toString(mb.toMessage()));
            broker.publish(errorMsg);
          }
        }
        else if (msg instanceof ErrorMessage)
        {
          if (ieContext != null)
          {
            // This is an error termination for the 2 following cases :
            // - either during an export
            // - or before an import really started
            //   For example, when we publish a request and the
            //  replicationServer did not find any import source.
            abandonImportExport((ErrorMessage)msg);
          }
          else
          {
            /* We can receive an error message from the replication server
             * in the following cases :
             * - we connected with an incorrect generation id
             */
            ErrorMessage errorMsg = (ErrorMessage)msg;
            logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
          }
        }
        else if (msg instanceof UpdateMessage)
        {
          update = (UpdateMessage) msg;
          receiveUpdate(update);
        }
      }
      catch (SocketTimeoutException e)
      {
        // just retry
      }
      // Test if we have received and export request message and
      // if that's the case handle it now.
@@ -1259,7 +1256,10 @@
    shutdown = true;
    // Stop the listener thread
    listenerThread.shutdown();
    if (listenerThread != null)
    {
      listenerThread.shutdown();
    }
    synchronized (this)
    {
@@ -1274,7 +1274,8 @@
    broker.stop();
    // Wait for the listener thread to stop
    listenerThread.waitForShutdown();
    if (listenerThread != null)
      listenerThread.waitForShutdown();
    // wait for completion of the persistentServerState thread.
    try
@@ -1441,6 +1442,7 @@
      {
        if (!dependency)
        {
          broker.updateWindowAfterReplay();
          if (msg.isAssured())
            ack(msg.getChangeNumber());
          incProcessedUpdates();