mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.55.2009 d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -188,6 +188,7 @@
  private long generationID;
  private int updateDoneCount = 0;
  private boolean connectRequiresRecovery = false;
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -694,6 +695,21 @@
                rsServerId = serverInfo.getServerId();
                rsServerUrl = bestServer;
                receiveTopo(topologyMsg);
                // Log a message to let the administrator know that the failure
                // was resolved.
                // Wakeup all the thread that were waiting on the window
                // on the previous connection.
                connectionError = false;
                if (sendWindow != null)
                {
                  sendWindow.release(Integer.MAX_VALUE);
                }
                sendWindow = new Semaphore(maxSendWindow);
                rcvWindow = maxRcvWindow;
                connected = true;
                // May have created a broker with null replication domain for
                // unit test purpose.
                if (domain != null)
@@ -703,8 +719,7 @@
                      serverInfo.getGenerationId(),
                      session);
                }
                receiveTopo(topologyMsg);
                connected = true;
                if (getRsGroupId() != groupId)
                {
                 // Connected to replication server with wrong group id:
@@ -766,17 +781,6 @@
      if (connected)
      {
        // Log a message to let the administrator know that the failure was
        // resolved.
        // Wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
        {
          sendWindow.release(Integer.MAX_VALUE);
        }
        sendWindow = new Semaphore(maxSendWindow);
        rcvWindow = maxRcvWindow;
        connectPhaseLock.notify();
        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
@@ -1786,6 +1790,25 @@
   */
  public void publish(ReplicationMsg msg)
  {
    _publish(msg, false);
  }
  /**
   * Publish a recovery message to the other servers.
   * @param msg the message to publish
   */
  public void publishRecovery(ReplicationMsg msg)
  {
    _publish(msg, true);
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
   * @param recoveryMsg the message is a recovery Message
   */
  void _publish(ReplicationMsg msg, boolean recoveryMsg)
  {
    boolean done = false;
    while (!done && !shutdown)
@@ -1825,6 +1848,15 @@
          currentWindowSemaphore = sendWindow;
        }
        // If the Replication domain has decided that there is a need to
        // recover some changes then it is not allowed to send this
        // change but it will be the responsibility of the recovery thread to
        // do it.
        if (!recoveryMsg & connectRequiresRecovery)
        {
          return;
        }
        if (msg instanceof UpdateMsg)
        {
          // Acquiring the window credit must be done outside of the
@@ -2548,4 +2580,18 @@
      ctHeartbeatPublisherThread = null;
    }
  }
  /**
   * Set the connectRequiresRecovery to the provided value.
   * This flag is used to indicate if a recovery of Update is necessary
   * after a reconnection to a RS.
   * It is the responsibility of the ReplicationDomain to set it during the
   * sessionInitiated phase.
   *
   * @param b the new value of the connectRequiresRecovery.
   */
  public void setRecoveryRequired(boolean b)
  {
    connectRequiresRecovery = b;
  }
}