/*
 * Decompiled with CFR 0.152.
 */
package shadow.palantir.driver.com.palantir.dialogue.core;

import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.Deque;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import shadow.palantir.driver.com.codahale.metrics.Counter;
import shadow.palantir.driver.com.codahale.metrics.Timer;
import shadow.palantir.driver.com.google.common.annotations.VisibleForTesting;
import shadow.palantir.driver.com.google.common.base.Suppliers;
import shadow.palantir.driver.com.google.common.util.concurrent.FutureCallback;
import shadow.palantir.driver.com.google.common.util.concurrent.Futures;
import shadow.palantir.driver.com.google.common.util.concurrent.ListenableFuture;
import shadow.palantir.driver.com.google.common.util.concurrent.SettableFuture;
import shadow.palantir.driver.com.palantir.dialogue.Channel;
import shadow.palantir.driver.com.palantir.dialogue.Endpoint;
import shadow.palantir.driver.com.palantir.dialogue.Request;
import shadow.palantir.driver.com.palantir.dialogue.Response;
import shadow.palantir.driver.com.palantir.dialogue.core.Config;
import shadow.palantir.driver.com.palantir.dialogue.core.DialogueClientMetrics;
import shadow.palantir.driver.com.palantir.dialogue.core.ImmutableDeferredCall;
import shadow.palantir.driver.com.palantir.dialogue.core.LimitedChannel;
import shadow.palantir.driver.com.palantir.dialogue.core.NeverThrowLimitedChannel;
import shadow.palantir.driver.com.palantir.dialogue.futures.DialogueFutures;
import shadow.palantir.driver.com.palantir.tracing.CloseableSpan;
import shadow.palantir.driver.com.palantir.tracing.DetachedSpan;
import shadow.palantir.driver.com.palantir.tracing.TagTranslator;
import shadow.palantir.driver.org.immutables.value.Value;

final class QueuedChannel
implements Channel {
    private static final SafeLogger log = SafeLoggerFactory.get(QueuedChannel.class);
    private static final LimitedChannel.LimitEnforcement DO_NOT_SKIP_LIMITS = LimitedChannel.LimitEnforcement.DEFAULT_ENABLED;
    private final Deque<DeferredCall> queuedCalls;
    private final NeverThrowLimitedChannel delegate;
    @Safe
    private final String channelName;
    @Safe
    private final String queueType;
    private final AtomicInteger queueSizeEstimate = new AtomicInteger(0);
    private final int maxQueueSize;
    private final Supplier<Counter> queueSizeCounter;
    private final Timer queuedTime;
    private final Supplier<ListenableFuture<Response>> limitedResultSupplier;
    private volatile boolean shouldRecordQueueMetrics;

    QueuedChannel(LimitedChannel delegate, @Safe String channelName, @Safe String queueType, QueuedChannelInstrumentation metrics, int maxQueueSize) {
        this.delegate = new NeverThrowLimitedChannel(delegate);
        this.channelName = channelName;
        this.queueType = queueType;
        this.queuedCalls = new ProtectedConcurrentLinkedDeque<DeferredCall>();
        this.maxQueueSize = maxQueueSize;
        this.queueSizeCounter = Suppliers.memoize(metrics::requestsQueued);
        this.queuedTime = metrics.requestQueuedTime();
        this.limitedResultSupplier = () -> Futures.immediateFailedFuture(new SafeRuntimeException("Unable to make a request (queue is full)", SafeArg.of("maxQueueSize", maxQueueSize)));
    }

    static QueuedChannel createForSticky(String channelName, int maxQueueSize, QueuedChannelInstrumentation queuedChannelInstrumentation, LimitedChannel delegate) {
        return new QueuedChannel(delegate, channelName, "sticky", queuedChannelInstrumentation, maxQueueSize);
    }

    static QueuedChannel create(Config cf, LimitedChannel delegate) {
        return new QueuedChannel(delegate, cf.channelName(), "channel", QueuedChannel.channelInstrumentation(DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()), cf.channelName()), cf.maxQueueSize());
    }

    static QueuedChannel create(Config cf, Endpoint endpoint, LimitedChannel delegate) {
        return new QueuedChannel(delegate, cf.channelName(), "endpoint", QueuedChannel.endpointInstrumentation(DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()), cf.channelName(), endpoint.serviceName(), endpoint.endpointName()), cf.maxQueueSize());
    }

    @Override
    public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
        return this.maybeExecute(endpoint, request).orElseGet(this.limitedResultSupplier);
    }

    @VisibleForTesting
    Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
        Optional<ListenableFuture<Response>> maybeResult;
        if (this.queueSizeEstimate.get() <= 0 && (maybeResult = this.delegate.maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS)).isPresent()) {
            ListenableFuture<Response> result = maybeResult.get();
            DialogueFutures.addDirectListener(result, this::onCompletion);
            if (this.shouldRecordQueueMetrics) {
                this.queuedTime.update(0L, TimeUnit.NANOSECONDS);
            }
            return maybeResult;
        }
        if (this.queueSizeEstimate.get() >= this.maxQueueSize) {
            return Optional.empty();
        }
        this.shouldRecordQueueMetrics = true;
        ImmutableDeferredCall components = DeferredCall.builder().endpoint(endpoint).request(request).response(SettableFuture.create()).span(DetachedSpan.start("Dialogue-request-enqueued")).timer(this.queuedTime.time()).build();
        if (!this.queuedCalls.offer(components)) {
            return Optional.empty();
        }
        int newSize = this.incrementQueueSize();
        if (log.isDebugEnabled()) {
            log.debug("Request queued {} on channel {}", SafeArg.of("queueSize", newSize), SafeArg.of("channelName", this.channelName));
        }
        this.schedule();
        return Optional.of(components.response());
    }

    private void onCompletion() {
        this.schedule();
    }

    @VisibleForTesting
    void schedule() {
        int numScheduled = 0;
        while (this.scheduleNextTask()) {
            ++numScheduled;
        }
        if (log.isDebugEnabled()) {
            log.debug("Scheduled {} requests on channel {}", SafeArg.of("numScheduled", numScheduled), SafeArg.of("channelName", this.channelName));
        }
    }

    private int incrementQueueSize() {
        this.queueSizeCounter.get().inc();
        return this.queueSizeEstimate.incrementAndGet();
    }

    private void decrementQueueSize() {
        this.queueSizeEstimate.decrementAndGet();
        this.queueSizeCounter.get().dec();
    }

    private boolean scheduleNextTask() {
        DeferredCall queueHead = this.queuedCalls.poll();
        if (queueHead == null) {
            return false;
        }
        SettableFuture<Response> queuedResponse = queueHead.response();
        if (queuedResponse.isDone()) {
            this.decrementQueueSize();
            queueHead.span().complete(QueuedChannelTagTranslator.INSTANCE, this);
            queueHead.timer().stop();
            return true;
        }
        try (CloseableSpan ignored = queueHead.span().attach();){
            Endpoint endpoint = queueHead.endpoint();
            Optional<ListenableFuture<Response>> maybeResponse = this.delegate.maybeExecute(endpoint, queueHead.request(), DO_NOT_SKIP_LIMITS);
            if (maybeResponse.isPresent()) {
                this.decrementQueueSize();
                ListenableFuture<Response> response = maybeResponse.get();
                queueHead.span().complete(QueuedChannelTagTranslator.INSTANCE, this);
                queueHead.timer().stop();
                DialogueFutures.addDirectCallback(response, new ForwardAndSchedule(queuedResponse));
                DialogueFutures.addDirectListener(queuedResponse, () -> {
                    if (queuedResponse.isCancelled() && !response.cancel(true) && log.isDebugEnabled()) {
                        log.debug("Failed to cancel delegate response, it should be reported by ForwardAndSchedule logging", SafeArg.of("channel", this.channelName), SafeArg.of("service", endpoint.serviceName()), SafeArg.of("endpoint", endpoint.endpointName()));
                    }
                });
                boolean bl = true;
                return bl;
            }
            if (!this.queuedCalls.offerFirst(queueHead)) {
                log.error("Failed to add an attempted call back to the deque", SafeArg.of("channel", this.channelName), SafeArg.of("service", endpoint.serviceName()), SafeArg.of("endpoint", endpoint.endpointName()));
                this.decrementQueueSize();
                queueHead.timer().stop();
                if (!queuedResponse.setException(new SafeRuntimeException("Failed to req-queue request", SafeArg.of("channel", this.channelName), SafeArg.of("service", endpoint.serviceName()), SafeArg.of("endpoint", endpoint.endpointName()))) && log.isDebugEnabled()) {
                    log.debug("Queued response has already been completed", SafeArg.of("channel", this.channelName), SafeArg.of("service", endpoint.serviceName()), SafeArg.of("endpoint", endpoint.endpointName()));
                }
            }
            boolean bl = false;
            return bl;
        }
    }

    public String toString() {
        return "QueuedChannel{queueSizeEstimate=" + this.queueSizeEstimate + ", maxQueueSize=" + this.maxQueueSize + ", delegate=" + this.delegate + "}";
    }

    static QueuedChannelInstrumentation channelInstrumentation(final DialogueClientMetrics metrics, final String channelName) {
        return new QueuedChannelInstrumentation(){

            @Override
            public Counter requestsQueued() {
                return metrics.requestsQueued(channelName);
            }

            @Override
            public Timer requestQueuedTime() {
                return metrics.requestQueuedTime(channelName);
            }
        };
    }

    static QueuedChannelInstrumentation stickyInstrumentation(final DialogueClientMetrics metrics, final String channelName) {
        return new MemoizedQueuedChannelInstrumentation(new QueuedChannelInstrumentation(){

            @Override
            public Counter requestsQueued() {
                return metrics.requestsStickyQueued(channelName);
            }

            @Override
            public Timer requestQueuedTime() {
                return metrics.requestStickyQueuedTime(channelName);
            }
        });
    }

    static QueuedChannelInstrumentation endpointInstrumentation(final DialogueClientMetrics metrics, final String channelName, final String service, final String endpoint) {
        return new QueuedChannelInstrumentation(){

            @Override
            public Counter requestsQueued() {
                return metrics.requestsEndpointQueued().channelName(channelName).serviceName(service).endpoint(endpoint).build();
            }

            @Override
            public Timer requestQueuedTime() {
                return metrics.requestEndpointQueuedTime().channelName(channelName).serviceName(service).endpoint(endpoint).build();
            }
        };
    }

    private static enum QueuedChannelTagTranslator implements TagTranslator<QueuedChannel>
    {
        INSTANCE;


        @Override
        public <T> void translate(TagTranslator.TagAdapter<T> adapter, T target, QueuedChannel data) {
            adapter.tag(target, "queue", data.queueType);
            adapter.tag(target, "channel", data.channelName);
        }
    }

    private static final class MemoizedQueuedChannelInstrumentation
    implements QueuedChannelInstrumentation {
        private final Supplier<Counter> requestsQueuedSupplier = Suppliers.memoize(delegate::requestsQueued);
        private final Supplier<Timer> requestQueuedTimeSupplier = Suppliers.memoize(delegate::requestQueuedTime);

        MemoizedQueuedChannelInstrumentation(QueuedChannelInstrumentation delegate) {
        }

        @Override
        public Counter requestsQueued() {
            return this.requestsQueuedSupplier.get();
        }

        @Override
        public Timer requestQueuedTime() {
            return this.requestQueuedTimeSupplier.get();
        }
    }

    static interface QueuedChannelInstrumentation {
        public Counter requestsQueued();

        public Timer requestQueuedTime();
    }

    private static final class ProtectedConcurrentLinkedDeque<T>
    extends ConcurrentLinkedDeque<T> {
        private ProtectedConcurrentLinkedDeque() {
        }

        @Override
        public int size() {
            throw new UnsupportedOperationException("size should never be called on a ConcurrentLinkedDeque");
        }
    }

    @Value.Immutable
    static interface DeferredCall {
        public Endpoint endpoint();

        public Request request();

        public SettableFuture<Response> response();

        public DetachedSpan span();

        public Timer.Context timer();

        public static Builder builder() {
            return new Builder();
        }

        public static class Builder
        extends ImmutableDeferredCall.Builder {
        }
    }

    private class ForwardAndSchedule
    implements FutureCallback<Response> {
        private final SettableFuture<Response> response;

        ForwardAndSchedule(SettableFuture<Response> response) {
            this.response = response;
        }

        @Override
        public void onSuccess(Response result) {
            if (!this.response.set(result)) {
                result.close();
            }
            QueuedChannel.this.schedule();
        }

        @Override
        public void onFailure(Throwable throwable) {
            if (!this.response.setException(throwable)) {
                if (throwable instanceof CancellationException) {
                    log.debug("Call was canceled", throwable);
                } else {
                    log.info("Call failed after the future completed", throwable);
                }
            }
            QueuedChannel.this.schedule();
        }
    }
}

