mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
27.28.2010 a5c5efbf8ca56c059709953f7fedb647dadaed06
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -316,6 +316,15 @@
  }
  /**
   * Set the generation id - for test purpose.
   * @param generationID The generation id
   */
  public void setGenerationID(long generationID)
  {
    this.generationID = generationID;
  }
  /**
   * Gets the server url of the RS we are connected to.
   * @return The server url of the RS we are connected to
   */
@@ -727,6 +736,15 @@
    {
      this.locallyConfigured = locallyConfigured;
    }
    /**
     * Returns a string representation of this object.
     * @return A string representation of this object.
     */
    public String toString()
    {
      return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId;
    }
  }
  private void connect()
@@ -859,7 +877,8 @@
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          TRACER.debugInfo(
            "phase 2 : will perform PhaseOneH with the preferred RS.");
            "phase 2 : will perform PhaseOneH with the preferred RS="
              + replicationServerInfo);
        replicationServerInfo = performPhaseOneHandshake(
          replicationServerInfo.getServerURL(), true);
@@ -2225,18 +2244,20 @@
  /**
   * restart the ReplicationBroker.
   * @param infiniteTry the socket which failed
   */
  public void reStart()
  public void reStart(boolean infiniteTry)
  {
    reStart(this.session);
    reStart(this.session, infiniteTry);
  }
  /**
   * Restart the ReplicationServer broker after a failure.
   *
   * @param failingSession the socket which failed
   * @param infiniteTry the socket which failed
   */
  public void reStart(ProtocolSession failingSession)
  public void reStart(ProtocolSession failingSession, boolean infiniteTry)
  {
    if (failingSession != null)
@@ -2268,6 +2289,7 @@
      rsGroupId = (byte) -1;
      rsServerId = -1;
      rsServerUrl = null;
      session = null;
    }
    while (!this.connected && (!this.shutdown))
    {
@@ -2282,6 +2304,8 @@
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
      if ((!connected) && (!infiniteTry))
        break;
      if ((!connected) && (!shutdown))
      {
        try
@@ -2293,6 +2317,11 @@
        }
      }
    }
    if (debugEnabled())
      TRACER.debugInfo(this +
          " end restart : connected=" + connected +
          " with RSid=" + this.getRsServerId() +
          " genid=" + this.generationID);
  }
  /**
@@ -2301,7 +2330,18 @@
   */
  public void publish(ReplicationMsg msg)
  {
    _publish(msg, false);
    _publish(msg, false, true);
  }
  /**
   * Publish a message to the other servers.
   * @param msg            The message to publish.
   * @param retryOnFailure Whether reconnect should automatically be done.
   * @return               Whether publish succeeded.
   */
  public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
  {
    return _publish(msg, false, retryOnFailure);
  }
  /**
@@ -2310,15 +2350,18 @@
   */
  public void publishRecovery(ReplicationMsg msg)
  {
    _publish(msg, true);
    _publish(msg, true, true);
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
   * @param recoveryMsg the message is a recovery Message
   * @param retryOnFailure whether retry should be done on failure
   * @return whether the message was successfully sent.
   */
  void _publish(ReplicationMsg msg, boolean recoveryMsg)
  boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
      boolean retryOnFailure)
  {
    boolean done = false;
@@ -2338,7 +2381,7 @@
            "message is not possible due to existing connection error.");
        }
        return;
        return false;
      }
      try
@@ -2365,7 +2408,7 @@
        // do it.
        if (!recoveryMsg & connectRequiresRecovery)
        {
          return;
          return false;
        }
        if (msg instanceof UpdateMsg)
@@ -2408,6 +2451,9 @@
        }
      } catch (IOException e)
      {
        if (!retryOnFailure)
          return false;
        // The receive threads should handle reconnection or
        // mark this broker in error. Just retry.
        synchronized (connectPhaseLock)
@@ -2435,6 +2481,7 @@
        }
      }
    }
    return true;
  }
  /**
@@ -2450,7 +2497,7 @@
   */
  public ReplicationMsg receive() throws SocketTimeoutException
  {
    return receive(false);
    return receive(false, true, false);
  }
  /**
@@ -2459,22 +2506,29 @@
   * called in a single thread or protected by a locking mechanism
   * before being called.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   * @param allowReconnectionMechanism If true, this allows the reconnection
   * mechanism to disconnect the broker if it detects that it should reconnect
   * to another replication server because of some criteria defined by the
   * algorithm where we choose a suitable replication server.
   * @param reconnectToTheBestRS Whether broker will automatically switch
   *                             to the best suitable RS.
   * @param reconnectOnFailure   Whether broker will automatically reconnect
   *                             on failure.
   * @param returnOnTopoChange   Whether broker should return TopologyMsg
   *                             received.
   * @return the received message
   *
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   */
  public ReplicationMsg receive(boolean allowReconnectionMechanism)
  public ReplicationMsg receive(boolean reconnectToTheBestRS,
      boolean reconnectOnFailure, boolean returnOnTopoChange)
    throws SocketTimeoutException
  {
    while (shutdown == false)
    {
      if (!connected)
      if ((reconnectOnFailure) && (!connected))
      {
        reStart(null);
        // infinite try to reconnect
        reStart(null, true);
      }
      ProtocolSession failingSession = session;
@@ -2496,11 +2550,16 @@
        {
          TopologyMsg topoMsg = (TopologyMsg) msg;
          receiveTopo(topoMsg);
          if (allowReconnectionMechanism)
          if (reconnectToTheBestRS)
          {
            // Reset wait time before next computation of best server
            mustRunBestServerCheckingAlgorithm = 0;
          }
          // Caller wants to check what's changed
          if (returnOnTopoChange)
            return msg;
        } else if (msg instanceof StopMsg)
        {
          /*
@@ -2512,7 +2571,7 @@
            Integer.toString(serverId));
          logError(message);
          // Try to find a suitable RS
          this.reStart(failingSession);
          this.reStart(failingSession, true);
        } else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier or
@@ -2551,7 +2610,7 @@
          // it is still the one we are currently connected to. If not,
          // disconnect properly and let the connection algorithm re-connect to
          // best replication server
          if (allowReconnectionMechanism)
          if (reconnectToTheBestRS)
          {
            mustRunBestServerCheckingAlgorithm++;
            if (mustRunBestServerCheckingAlgorithm == 2)
@@ -2572,9 +2631,10 @@
                  NOTE_NEW_BEST_REPLICATION_SERVER.get(baseDn.toString(),
                  Integer.toString(serverId),
                  Integer.toString(rsServerId),
                  rsServerUrl);
                  rsServerUrl,
                  Integer.toString(bestServerInfo.getServerId()));
                logError(message);
                reStart();
                reStart(null, true);
              }
              // Reset wait time before next computation of best server
@@ -2603,10 +2663,13 @@
              Integer.toString(serverId));
            logError(message);
          }
          this.reStart(failingSession);
          if (reconnectOnFailure)
            reStart(failingSession, true);
          else
            break; // does not seem necessary to explicitely disconnect ..
        }
      }
    }
    } // while !shutdown
    return null;
  }
@@ -2676,11 +2739,10 @@
  public void stop()
  {
    if (debugEnabled())
    {
      debugInfo("ReplicationBroker " + serverId + " is stopping and will" +
        " close the connection to replication server " + rsServerId + " for" +
        " domain " + baseDn);
    }
    stopRSHeartBeatMonitoring();
    stopChangeTimeHeartBeatPublishing();
    replicationServer = "stopped";
@@ -2690,25 +2752,17 @@
    rsServerId = -1;
    rsServerUrl = null;
    if (session != null)
    try
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        session.close();
      } catch (IOException e)
      {
      }
      session.close();
    } catch (Exception e)
    {
      // Anyway, going to close session, so nothing to do
    }
  }
@@ -2979,6 +3033,9 @@
   */
  public void receiveTopo(TopologyMsg topoMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo(this + " receive TopologyMsg=" + topoMsg);
    // Store new DS list
    dsList = topoMsg.getDsList();
@@ -3100,4 +3157,14 @@
  {
    connectRequiresRecovery = b;
  }
  /**
   * Returns whether the broker is shutting down.
   * @return whether the broker is shutting down.
   */
  public boolean shuttingDown()
  {
    return shutdown;
  }
}