/*
|
* Copyright (c) 2016-present, RxJava Contributors.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
|
* compliance with the License. You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is
|
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
|
* the License for the specific language governing permissions and limitations under the License.
|
*/
|
|
package org.openidentityplatform.rxjava3.internal.util;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import io.reactivex.rxjava3.annotations.NonNull;
|
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
|
|
/**
|
* Utility class to help with backpressure-related operations such as request aggregation.
|
*/
|
public final class BackpressureHelper {
|
/** Utility class. */
|
private BackpressureHelper() {
|
throw new IllegalStateException("No instances!");
|
}
|
|
/**
|
* Adds two long values and caps the sum at {@link Long#MAX_VALUE}.
|
* @param a the first value
|
* @param b the second value
|
* @return the sum capped at {@link Long#MAX_VALUE}
|
*/
|
public static long addCap(long a, long b) {
|
long u = a + b;
|
if (u < 0L) {
|
return Long.MAX_VALUE;
|
}
|
return u;
|
}
|
|
/**
|
* Multiplies two long values and caps the product at {@link Long#MAX_VALUE}.
|
* @param a the first value
|
* @param b the second value
|
* @return the product capped at {@link Long#MAX_VALUE}
|
*/
|
public static long multiplyCap(long a, long b) {
|
long u = a * b;
|
if (((a | b) >>> 31) != 0) {
|
if (u / a != b) {
|
return Long.MAX_VALUE;
|
}
|
}
|
return u;
|
}
|
|
/**
|
* Atomically adds the positive value n to the requested value in the {@link AtomicLong} and
|
* caps the result at {@link Long#MAX_VALUE} and returns the previous value.
|
* @param requested the {@code AtomicLong} holding the current requested value
|
* @param n the value to add, must be positive (not verified)
|
* @return the original value before the add
|
*/
|
public static long add(@NonNull AtomicLong requested, long n) {
|
for (;;) {
|
long r = requested.get();
|
if (r == Long.MAX_VALUE) {
|
return Long.MAX_VALUE;
|
}
|
long u = addCap(r, n);
|
if (requested.compareAndSet(r, u)) {
|
return r;
|
}
|
}
|
}
|
|
/**
|
* Atomically adds the positive value n to the requested value in the {@link AtomicLong} and
|
* caps the result at {@link Long#MAX_VALUE} and returns the previous value and
|
* considers {@link Long#MIN_VALUE} as a cancel indication (no addition then).
|
* @param requested the {@code AtomicLong} holding the current requested value
|
* @param n the value to add, must be positive (not verified)
|
* @return the original value before the add
|
*/
|
public static long addCancel(@NonNull AtomicLong requested, long n) {
|
for (;;) {
|
long r = requested.get();
|
if (r == Long.MIN_VALUE) {
|
return Long.MIN_VALUE;
|
}
|
if (r == Long.MAX_VALUE) {
|
return Long.MAX_VALUE;
|
}
|
long u = addCap(r, n);
|
if (requested.compareAndSet(r, u)) {
|
return r;
|
}
|
}
|
}
|
|
/**
|
* Atomically subtract the given number (positive, not validated) from the target field unless it contains {@link Long#MAX_VALUE}.
|
* @param requested the target field holding the current requested amount
|
* @param n the produced element count, positive (not validated)
|
* @return the new amount
|
*/
|
public static long produced(@NonNull AtomicLong requested, long n) {
|
for (;;) {
|
long current = requested.get();
|
if (current == Long.MAX_VALUE) {
|
return Long.MAX_VALUE;
|
}
|
long update = current - n;
|
if (update < 0L) {
|
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
|
update = 0L;
|
}
|
if (requested.compareAndSet(current, update)) {
|
return update;
|
}
|
}
|
}
|
|
/**
|
* Atomically subtract the given number (positive, not validated) from the target field if
|
* it doesn't contain {@link Long#MIN_VALUE} (indicating some cancelled state) or {@link Long#MAX_VALUE} (unbounded mode).
|
* @param requested the target field holding the current requested amount
|
* @param n the produced element count, positive (not validated)
|
* @return the new amount
|
*/
|
public static long producedCancel(@NonNull AtomicLong requested, long n) {
|
for (;;) {
|
long current = requested.get();
|
if (current == Long.MIN_VALUE) {
|
return Long.MIN_VALUE;
|
}
|
if (current == Long.MAX_VALUE) {
|
return Long.MAX_VALUE;
|
}
|
long update = current - n;
|
if (update < 0L) {
|
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
|
update = 0L;
|
}
|
if (requested.compareAndSet(current, update)) {
|
return update;
|
}
|
}
|
}
|
}
|