mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
05.59.2007 13231f3def739a90b963d42853dea768789925f1
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -207,8 +207,10 @@
    }
    boolean checkState = true;
    while ((!connected) && (!shutdown))
    boolean receivedResponse = true;
    while ((!connected) && (!shutdown) && (receivedResponse))
    {
      receivedResponse = false;
      for (String server : servers)
      {
        int separator = server.lastIndexOf(':');
@@ -243,6 +245,7 @@
           */
          session.setSoTimeout(1000);
          startMsg = (ReplServerStartMessage) session.receive();
          receivedResponse = true;
          /*
           * We have sent our own protocol version to the replication server.
@@ -274,6 +277,7 @@
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
            startHeartBeat();
            break;
          }
          else
@@ -333,6 +337,7 @@
                {
                  publish(replayOp.generateMessage());
                }
                startHeartBeat();
                break;
              }
            }
@@ -375,51 +380,48 @@
        }
      }
      if (!connected)
      if ((!connected) && (checkState == true) && receivedResponse)
      {
        if (checkState == true)
        /*
         * We could not find a replicationServer that has seen all the
         * changes that this server has already processed, start again
         * the loop looking for any replicationServer.
         */
        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
        String message = getMessage(msgID);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            message, msgID);
        try
        {
          /*
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          try
          {
            Thread.sleep(500);
          } catch (InterruptedException e)
          {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
          checkState = false;
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
          String message = getMessage(msgID);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE,
              message, msgID);
        }
        else
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          /*
           * This server could not find any replicationServer
           * Let's wait a little and try again.
           */
          checkState = false;
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
          String message = getMessage(msgID);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE, message, msgID);
          try
          {
            Thread.sleep(1000);
          } catch (InterruptedException e)
          {
          }
        }
        checkState = false;
      }
    }
    if (!connected)
    {
      /*
       * This server could not find any replicationServer
       * It's going to start in degraded mode.
       * Log a message
       */
      checkState = false;
      int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
      String message = getMessage(msgID);
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.NOTICE, message, msgID);
    }
  }
  /**
   * Start the heartbeat monitor thread.
   */
  private void startHeartBeat()
  {
    // Start a heartbeat monitor thread.
    if (heartbeatInterval > 0)
    {
@@ -432,17 +434,27 @@
  /**
   * restart the ReplicationBroker.
   */
  private void reStart()
  {
    reStart(null);
  }
  /**
   * Restart the ReplicationServer broker after a failure.
   *
   * @param failingSession the socket which failed
   */
  private void reStart(ProtocolSession failingSession)
  {
    numLostConnections++;
    try
    {
      failingSession.close();
      if (failingSession != null)
      {
        failingSession.close();
        numLostConnections++;
      }
    } catch (IOException e1)
    {
      // ignore
@@ -465,6 +477,16 @@
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
      }
      if ((!connected) && (!shutdown))
      {
        try
        {
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          // ignore
        }
      }
    }
  }
@@ -513,6 +535,9 @@
  {
    while (shutdown == false)
    {
      if (!connected)
        reStart();
      ProtocolSession failingSession = session;
      try
      {
@@ -575,26 +600,6 @@
  }
  /**
   * restart the server after a suspension.
   * @throws Exception in case of errors.
   */
  public void restartReceive() throws Exception
  {
    // TODO Auto-generated method stub
  }
  /**
   * Suspend message reception.
   * @throws Exception in case of errors.
   */
  public void suspendReceive() throws Exception
  {
    // TODO Auto-generated method stub
  }
  /**
   * Set a timeout value.
   * With this option set to a non-zero value, calls to the receive() method
   * block for only this amount of time after which a