mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
05.59.2007 97a7b362d2b9c414106920569d18c86d3dea0ac6
Fix for issue 1764 : infinite loop when no replication server up and running.

If there is no replication server the domain will start anyway and will retry to
connect regularly.
The changes happening during this time will be sent to the replication server
when it starts (currently only for modify operations)
2 files modified
132 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 129 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 3 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -207,8 +207,10 @@
    }
    boolean checkState = true;
    while ((!connected) && (!shutdown))
    boolean receivedResponse = true;
    while ((!connected) && (!shutdown) && (receivedResponse))
    {
      receivedResponse = false;
      for (String server : servers)
      {
        int separator = server.lastIndexOf(':');
@@ -243,6 +245,7 @@
           */
          session.setSoTimeout(1000);
          startMsg = (ReplServerStartMessage) session.receive();
          receivedResponse = true;
          /*
           * We have sent our own protocol version to the replication server.
@@ -274,6 +277,7 @@
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
            startHeartBeat();
            break;
          }
          else
@@ -333,6 +337,7 @@
                {
                  publish(replayOp.generateMessage());
                }
                startHeartBeat();
                break;
              }
            }
@@ -375,51 +380,48 @@
        }
      }
      if (!connected)
      if ((!connected) && (checkState == true) && receivedResponse)
      {
        if (checkState == true)
        /*
         * We could not find a replicationServer that has seen all the
         * changes that this server has already processed, start again
         * the loop looking for any replicationServer.
         */
        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
        String message = getMessage(msgID);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            message, msgID);
        try
        {
          /*
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          try
          {
            Thread.sleep(500);
          } catch (InterruptedException e)
          {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
          checkState = false;
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
          String message = getMessage(msgID);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE,
              message, msgID);
        }
        else
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          /*
           * This server could not find any replicationServer
           * Let's wait a little and try again.
           */
          checkState = false;
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
          String message = getMessage(msgID);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE, message, msgID);
          try
          {
            Thread.sleep(1000);
          } catch (InterruptedException e)
          {
          }
        }
        checkState = false;
      }
    }
    if (!connected)
    {
      /*
       * This server could not find any replicationServer
       * It's going to start in degraded mode.
       * Log a message
       */
      checkState = false;
      int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
      String message = getMessage(msgID);
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE, message, msgID);
    }
  }
  /**
   * Start the heartbeat monitor thread.
   */
  private void startHeartBeat()
  {
    // Start a heartbeat monitor thread.
    if (heartbeatInterval > 0)
    {
@@ -432,17 +434,27 @@
  /**
   * restart the ReplicationBroker.
   */
  private void reStart()
  {
    reStart(null);
  }
  /**
   * Restart the ReplicationServer broker after a failure.
   *
   * @param failingSession the socket which failed
   */
  private void reStart(ProtocolSession failingSession)
  {
    numLostConnections++;
    try
    {
      failingSession.close();
      if (failingSession != null)
      {
        failingSession.close();
        numLostConnections++;
      }
    } catch (IOException e1)
    {
      // ignore
@@ -465,6 +477,16 @@
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
      }
      if ((!connected) && (!shutdown))
      {
        try
        {
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          // ignore
        }
      }
    }
  }
@@ -513,6 +535,9 @@
  {
    while (shutdown == false)
    {
      if (!connected)
        reStart();
      ProtocolSession failingSession = session;
      try
      {
@@ -575,26 +600,6 @@
  }
  /**
   * restart the server after a suspension.
   * @throws Exception in case of errors.
   */
  public void restartReceive() throws Exception
  {
    // TODO Auto-generated method stub
  }
  /**
   * Suspend message reception.
   * @throws Exception in case of errors.
   */
  public void suspendReceive() throws Exception
  {
    // TODO Auto-generated method stub
  }
  /**
   * Set a timeout value.
   * With this option set to a non-zero value, calls to the receive() method
   * block for only this amount of time after which a
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -264,7 +264,6 @@
  private List<DN> branches = new ArrayList<DN>(0);
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
  private Collection<String> replicationServers;
@@ -345,8 +344,6 @@
      synchronized (broker)
      {
        broker.start(replicationServers);
        if (!receiveStatus)
          broker.suspendReceive();
      }
      // Retrieves the related backend and its config entry