| New file |
| | |
| | | /* |
| | | * Copyright (c) 2016-present, RxJava Contributors. |
| | | * |
| | | * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in |
| | | * compliance with the License. You may obtain a copy of the License at |
| | | * |
| | | * http://www.apache.org/licenses/LICENSE-2.0 |
| | | * |
| | | * Unless required by applicable law or agreed to in writing, software distributed under the License is |
| | | * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See |
| | | * the License for the specific language governing permissions and limitations under the License. |
| | | */ |
| | | |
| | | package org.openidentityplatform.rxjava3.internal.util; |
| | | |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import io.reactivex.rxjava3.annotations.NonNull; |
| | | import io.reactivex.rxjava3.plugins.RxJavaPlugins; |
| | | |
| | | /** |
| | | * Utility class to help with backpressure-related operations such as request aggregation. |
| | | */ |
| | | public final class BackpressureHelper { |
| | | /** Utility class. */ |
| | | private BackpressureHelper() { |
| | | throw new IllegalStateException("No instances!"); |
| | | } |
| | | |
| | | /** |
| | | * Adds two long values and caps the sum at {@link Long#MAX_VALUE}. |
| | | * @param a the first value |
| | | * @param b the second value |
| | | * @return the sum capped at {@link Long#MAX_VALUE} |
| | | */ |
| | | public static long addCap(long a, long b) { |
| | | long u = a + b; |
| | | if (u < 0L) { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | return u; |
| | | } |
| | | |
| | | /** |
| | | * Multiplies two long values and caps the product at {@link Long#MAX_VALUE}. |
| | | * @param a the first value |
| | | * @param b the second value |
| | | * @return the product capped at {@link Long#MAX_VALUE} |
| | | */ |
| | | public static long multiplyCap(long a, long b) { |
| | | long u = a * b; |
| | | if (((a | b) >>> 31) != 0) { |
| | | if (u / a != b) { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | } |
| | | return u; |
| | | } |
| | | |
| | | /** |
| | | * Atomically adds the positive value n to the requested value in the {@link AtomicLong} and |
| | | * caps the result at {@link Long#MAX_VALUE} and returns the previous value. |
| | | * @param requested the {@code AtomicLong} holding the current requested value |
| | | * @param n the value to add, must be positive (not verified) |
| | | * @return the original value before the add |
| | | */ |
| | | public static long add(@NonNull AtomicLong requested, long n) { |
| | | for (;;) { |
| | | long r = requested.get(); |
| | | if (r == Long.MAX_VALUE) { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | long u = addCap(r, n); |
| | | if (requested.compareAndSet(r, u)) { |
| | | return r; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Atomically adds the positive value n to the requested value in the {@link AtomicLong} and |
| | | * caps the result at {@link Long#MAX_VALUE} and returns the previous value and |
| | | * considers {@link Long#MIN_VALUE} as a cancel indication (no addition then). |
| | | * @param requested the {@code AtomicLong} holding the current requested value |
| | | * @param n the value to add, must be positive (not verified) |
| | | * @return the original value before the add |
| | | */ |
| | | public static long addCancel(@NonNull AtomicLong requested, long n) { |
| | | for (;;) { |
| | | long r = requested.get(); |
| | | if (r == Long.MIN_VALUE) { |
| | | return Long.MIN_VALUE; |
| | | } |
| | | if (r == Long.MAX_VALUE) { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | long u = addCap(r, n); |
| | | if (requested.compareAndSet(r, u)) { |
| | | return r; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Atomically subtract the given number (positive, not validated) from the target field unless it contains {@link Long#MAX_VALUE}. |
| | | * @param requested the target field holding the current requested amount |
| | | * @param n the produced element count, positive (not validated) |
| | | * @return the new amount |
| | | */ |
| | | public static long produced(@NonNull AtomicLong requested, long n) { |
| | | for (;;) { |
| | | long current = requested.get(); |
| | | if (current == Long.MAX_VALUE) { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | long update = current - n; |
| | | if (update < 0L) { |
| | | RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update)); |
| | | update = 0L; |
| | | } |
| | | if (requested.compareAndSet(current, update)) { |
| | | return update; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Atomically subtract the given number (positive, not validated) from the target field if |
| | | * it doesn't contain {@link Long#MIN_VALUE} (indicating some cancelled state) or {@link Long#MAX_VALUE} (unbounded mode). |
| | | * @param requested the target field holding the current requested amount |
| | | * @param n the produced element count, positive (not validated) |
| | | * @return the new amount |
| | | */ |
| | | public static long producedCancel(@NonNull AtomicLong requested, long n) { |
| | | for (;;) { |
| | | long current = requested.get(); |
| | | if (current == Long.MIN_VALUE) { |
| | | return Long.MIN_VALUE; |
| | | } |
| | | if (current == Long.MAX_VALUE) { |
| | | return Long.MAX_VALUE; |
| | | } |
| | | long update = current - n; |
| | | if (update < 0L) { |
| | | RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update)); |
| | | update = 0L; |
| | | } |
| | | if (requested.compareAndSet(current, update)) { |
| | | return update; |
| | | } |
| | | } |
| | | } |
| | | } |