/*
 * Decompiled with CFR 0.152.
 */
package shadow.palantir.driver.com.palantir.dialogue.core;

import com.palantir.logsafe.Arg;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.DoubleSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import shadow.palantir.driver.com.codahale.metrics.Meter;
import shadow.palantir.driver.com.google.common.annotations.VisibleForTesting;
import shadow.palantir.driver.com.google.common.base.Suppliers;
import shadow.palantir.driver.com.google.common.util.concurrent.FutureCallback;
import shadow.palantir.driver.com.google.common.util.concurrent.Futures;
import shadow.palantir.driver.com.google.common.util.concurrent.ListenableFuture;
import shadow.palantir.driver.com.google.common.util.concurrent.ListeningScheduledExecutorService;
import shadow.palantir.driver.com.google.common.util.concurrent.MoreExecutors;
import shadow.palantir.driver.com.google.common.util.concurrent.SettableFuture;
import shadow.palantir.driver.com.google.common.util.concurrent.ThreadFactoryBuilder;
import shadow.palantir.driver.com.palantir.conjure.java.client.config.ClientConfiguration;
import shadow.palantir.driver.com.palantir.dialogue.Endpoint;
import shadow.palantir.driver.com.palantir.dialogue.EndpointChannel;
import shadow.palantir.driver.com.palantir.dialogue.HttpMethod;
import shadow.palantir.driver.com.palantir.dialogue.Request;
import shadow.palantir.driver.com.palantir.dialogue.RequestBody;
import shadow.palantir.driver.com.palantir.dialogue.Response;
import shadow.palantir.driver.com.palantir.dialogue.core.Config;
import shadow.palantir.driver.com.palantir.dialogue.core.DialogueClientMetrics;
import shadow.palantir.driver.com.palantir.dialogue.core.DialogueExecutors;
import shadow.palantir.driver.com.palantir.dialogue.core.MeshMode;
import shadow.palantir.driver.com.palantir.dialogue.core.Responses;
import shadow.palantir.driver.com.palantir.dialogue.futures.DialogueFutures;
import shadow.palantir.driver.com.palantir.tracing.DetachedSpan;
import shadow.palantir.driver.com.palantir.tracing.TagTranslator;
import shadow.palantir.driver.com.palantir.tracing.Tracers;
import shadow.palantir.driver.com.palantir.tritium.metrics.MetricRegistries;
import shadow.palantir.driver.com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry;
import shadow.palantir.driver.com.palantir.tritium.metrics.registry.SharedTaggedMetricRegistries;
import shadow.palantir.driver.com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import shadow.palantir.driver.javax.annotation.Nullable;

final class RetryingChannel
implements EndpointChannel {
    private static final SafeLogger log = SafeLoggerFactory.get(RetryingChannel.class);
    private static final String SCHEDULER_NAME = "dialogue-RetryingChannel-scheduler";
    static final Supplier<ScheduledExecutorService> sharedScheduler = Suppliers.memoize(() -> DialogueExecutors.newSharedSingleThreadScheduler(MetricRegistries.instrument(SharedTaggedMetricRegistries.getSingleton(), new ThreadFactoryBuilder().setNameFormat("dialogue-RetryingChannel-scheduler-%d").setDaemon(true).build(), SCHEDULER_NAME)));
    private static final BiFunction<Endpoint, Response, Throwable> qosThrowable = (_endpoint, response) -> new SafeRuntimeException("Received retryable response", SafeArg.of("status", response.code()));
    private static final BiFunction<Endpoint, Response, Throwable> serverErrorThrowable = (endpoint, response) -> new SafeRuntimeException("Received server error, but http method is safe to retry", SafeArg.of("status", response.code()), SafeArg.of("method", endpoint.httpMethod()));
    private final ListeningScheduledExecutorService scheduler;
    private final EndpointChannel delegate;
    private final Endpoint endpoint;
    private final String channelName;
    private final int maxRetries;
    private final ClientConfiguration.ServerQoS serverQoS;
    private final ClientConfiguration.RetryOnTimeout retryOnTimeout;
    private final Duration backoffSlotSize;
    private final DoubleSupplier jitter;
    private final Supplier<Meter> retryDueToServerError;
    private final Supplier<Meter> retryDueToQosResponse;
    private final Function<Throwable, Meter> retryDueToThrowable;

    static EndpointChannel create(Config cf, EndpointChannel channel, Endpoint endpoint) {
        ClientConfiguration clientConf = cf.clientConf();
        if (clientConf.maxNumRetries() == 0) {
            return channel;
        }
        if (cf.mesh() == MeshMode.USE_EXTERNAL_MESH) {
            if (log.isDebugEnabled()) {
                log.debug("Disabling retrying channel due to MeshMode", SafeArg.of("channel", cf.channelName()), SafeArg.of("ignoredMaxNumRetries", clientConf.maxNumRetries()));
            }
            return channel;
        }
        return new RetryingChannel(channel, endpoint, cf.channelName(), clientConf.taggedMetricRegistry(), clientConf.maxNumRetries(), clientConf.backoffSlotSize(), clientConf.serverQoS(), clientConf.retryOnTimeout(), cf.scheduler(), cf.random()::nextDouble);
    }

    @VisibleForTesting
    RetryingChannel(EndpointChannel delegate, Endpoint endpoint, String channelName, int maxRetries, Duration backoffSlotSize, ClientConfiguration.ServerQoS serverQoS, ClientConfiguration.RetryOnTimeout retryOnTimeout) {
        this(delegate, endpoint, channelName, new DefaultTaggedMetricRegistry(), maxRetries, backoffSlotSize, serverQoS, retryOnTimeout, sharedScheduler.get(), () -> ThreadLocalRandom.current().nextDouble());
    }

    private RetryingChannel(EndpointChannel delegate, Endpoint endpoint, String channelName, TaggedMetricRegistry metrics, int maxRetries, Duration backoffSlotSize, ClientConfiguration.ServerQoS serverQoS, ClientConfiguration.RetryOnTimeout retryOnTimeout, ScheduledExecutorService scheduler, DoubleSupplier jitter) {
        this.delegate = delegate;
        this.endpoint = endpoint;
        this.channelName = channelName;
        this.maxRetries = maxRetries;
        this.backoffSlotSize = backoffSlotSize;
        this.serverQoS = serverQoS;
        this.retryOnTimeout = retryOnTimeout;
        this.scheduler = RetryingChannel.instrument(scheduler, metrics);
        this.jitter = jitter;
        DialogueClientMetrics dialogueClientMetrics = DialogueClientMetrics.of(metrics);
        this.retryDueToServerError = Suppliers.memoize(() -> dialogueClientMetrics.requestRetry().channelName(channelName).reason("serverError").build());
        this.retryDueToQosResponse = Suppliers.memoize(() -> dialogueClientMetrics.requestRetry().channelName(channelName).reason("qosResponse").build());
        this.retryDueToThrowable = throwable -> dialogueClientMetrics.requestRetry().channelName(channelName).reason(throwable.getClass().getSimpleName()).build();
    }

    @Override
    public ListenableFuture<Response> execute(Request request) {
        Optional<SafeRuntimeException> debugStacktrace = log.isDebugEnabled() ? Optional.of(new SafeRuntimeException("Exception for stacktrace", new Arg[0])) : Optional.empty();
        return new RetryingCallback(this.endpoint, request, debugStacktrace).execute();
    }

    public String toString() {
        return "RetryingChannel{maxRetries=" + this.maxRetries + ", serverQoS=" + this.serverQoS + " delegate=" + this.delegate + "}";
    }

    private static boolean isEtimedoutException(Throwable throwable) {
        return throwable != null && SocketException.class.equals(throwable.getClass()) && "Connection timed out".equals(throwable.getMessage());
    }

    private static Request trackNonRepeatableBodyConsumption(Request request) {
        if (request.body().isEmpty() || request.body().get().repeatable()) {
            return request;
        }
        return Request.builder().from(request).body(new ConsumptionTrackingRequestBody(request.body().get())).build();
    }

    private static boolean safeToRetry(HttpMethod httpMethod) {
        switch (httpMethod) {
            case GET: 
            case HEAD: 
            case OPTIONS: 
            case PUT: 
            case DELETE: {
                return true;
            }
            case POST: 
            case PATCH: {
                return false;
            }
        }
        throw new SafeIllegalStateException("Unknown method", SafeArg.of("httpMethod", httpMethod));
    }

    private static ListeningScheduledExecutorService instrument(ScheduledExecutorService delegate, TaggedMetricRegistry metrics) {
        return MoreExecutors.listeningDecorator(Tracers.wrap(SCHEDULER_NAME, MetricRegistries.instrument(metrics, delegate, SCHEDULER_NAME)));
    }

    private static final class ConsumptionTrackingRequestBody
    implements RequestBody {
        private final RequestBody delegate;
        private volatile boolean consumed;

        ConsumptionTrackingRequestBody(RequestBody delegate) {
            this.delegate = delegate;
        }

        @Override
        public void writeTo(OutputStream output) throws IOException {
            this.consumed = true;
            this.delegate.writeTo(output);
        }

        @Override
        public String contentType() {
            return this.delegate.contentType();
        }

        @Override
        public OptionalLong contentLength() {
            return this.delegate.contentLength();
        }

        @Override
        public boolean repeatable() {
            return this.delegate.repeatable();
        }

        @Override
        public void close() {
            this.consumed = true;
            this.delegate.close();
        }

        boolean requestBodyCanBeRetried() {
            return this.repeatable() || !this.consumed;
        }

        public String toString() {
            return "ConsumptionTrackingRequestBody{" + this.delegate + "}";
        }
    }

    private final class RetryingCallback {
        private final Endpoint endpoint;
        private final Request request;
        private final Optional<SafeRuntimeException> callsiteStacktrace;
        private final DetachedSpan span = DetachedSpan.start("Dialogue-RetryingChannel");
        private int failures = 0;

        private RetryingCallback(Endpoint endpoint, Request request, Optional<SafeRuntimeException> callsiteStacktrace) {
            this.endpoint = endpoint;
            this.request = RetryingChannel.trackNonRepeatableBodyConsumption(request);
            this.callsiteStacktrace = callsiteStacktrace;
        }

        ListenableFuture<Response> execute() {
            ListenableFuture<Response> result = this.wrap(RetryingChannel.this.delegate.execute(this.request));
            result.addListener(() -> {
                if (this.failures > 0) {
                    this.span.complete(RetryingCallbackTranslator.INSTANCE, this);
                }
            }, DialogueFutures.safeDirectExecutor());
            return result;
        }

        private boolean requestCanBeRetried() {
            if (this.request.body().isEmpty()) {
                return true;
            }
            RequestBody body = this.request.body().get();
            if (body instanceof ConsumptionTrackingRequestBody) {
                return ((ConsumptionTrackingRequestBody)body).requestBodyCanBeRetried();
            }
            return body.repeatable();
        }

        private ListenableFuture<Response> wrap(ListenableFuture<Response> input) {
            ListenableFuture<Response> result = input;
            result = DialogueFutures.transformAsync(result, this::handleHttpResponse);
            result = DialogueFutures.catchingAllAsync(result, this::handleThrowable);
            return result;
        }

        private ListenableFuture<Response> handleHttpResponse(Response response) {
            boolean canRetryRequest = this.requestCanBeRetried();
            if (canRetryRequest && this.isRetryableQosStatus(response)) {
                return this.incrementFailuresAndMaybeRetry(response, qosThrowable, RetryingChannel.this.retryDueToQosResponse.get());
            }
            if (canRetryRequest && Responses.isInternalServerError(response) && RetryingChannel.safeToRetry(this.endpoint.httpMethod())) {
                return this.incrementFailuresAndMaybeRetry(response, serverErrorThrowable, RetryingChannel.this.retryDueToServerError.get());
            }
            return Futures.immediateFuture(response);
        }

        private ListenableFuture<Response> handleThrowable(Throwable clientSideThrowable) {
            if (++this.failures <= RetryingChannel.this.maxRetries) {
                if (this.requestCanBeRetried() && this.shouldAttemptToRetry(clientSideThrowable)) {
                    this.callsiteStacktrace.ifPresent(clientSideThrowable::addSuppressed);
                    Meter retryReason = RetryingChannel.this.retryDueToThrowable.apply(clientSideThrowable);
                    long backoffNanoseconds = this.getBackoffNanoseconds();
                    this.infoLogRetry(backoffNanoseconds, OptionalInt.empty(), clientSideThrowable);
                    return this.scheduleRetry(retryReason, backoffNanoseconds);
                }
                if (log.isDebugEnabled()) {
                    this.callsiteStacktrace.ifPresent(clientSideThrowable::addSuppressed);
                    if (log.isDebugEnabled()) {
                        log.debug("Not attempting to retry failure. channel: {}, service: {}, endpoint: {}", SafeArg.of("channelName", RetryingChannel.this.channelName), SafeArg.of("serviceName", this.endpoint.serviceName()), SafeArg.of("endpoint", this.endpoint.endpointName()), clientSideThrowable);
                    }
                }
            }
            return Futures.immediateFailedFuture(clientSideThrowable);
        }

        private ListenableFuture<Response> incrementFailuresAndMaybeRetry(Response response, BiFunction<Endpoint, Response, Throwable> failureSupplier, Meter meter) {
            if (++this.failures <= RetryingChannel.this.maxRetries) {
                response.close();
                Throwable throwableToLog = log.isTraceEnabled() ? failureSupplier.apply(this.endpoint, response) : null;
                long backoffNanos = Responses.isRetryOther(response) ? 0L : this.getBackoffNanoseconds();
                this.infoLogRetry(backoffNanos, OptionalInt.of(response.code()), throwableToLog);
                return this.scheduleRetry(meter, backoffNanos);
            }
            this.infoLogRetriesExhausted(response);
            return Futures.immediateFuture(response);
        }

        private ListenableFuture<Response> scheduleRetry(Meter meter, long backoffNanoseconds) {
            meter.mark();
            if (backoffNanoseconds <= 0L) {
                return this.wrap(RetryingChannel.this.delegate.execute(this.request));
            }
            DetachedSpan backoffSpan = this.span.childDetachedSpan("retry-backoff");
            final SettableFuture<Response> responseFuture = SettableFuture.create();
            RetryingChannel.this.scheduler.schedule(() -> {
                backoffSpan.complete(RetryingCallbackTranslator.INSTANCE, this);
                if (responseFuture.isDone()) {
                    return;
                }
                final ListenableFuture<Response> delegateResult = RetryingChannel.this.delegate.execute(this.request);
                DialogueFutures.addDirectCallback(delegateResult, new FutureCallback<Response>(){

                    @Override
                    public void onSuccess(Response result) {
                        if (!responseFuture.set(result)) {
                            result.close();
                        }
                    }

                    @Override
                    public void onFailure(Throwable throwable) {
                        if (delegateResult.isCancelled()) {
                            responseFuture.cancel(false);
                        } else if (!responseFuture.setException(throwable)) {
                            log.info("Response future completed before delegate threw", throwable);
                        }
                    }
                });
                DialogueFutures.addDirectListener(responseFuture, () -> {
                    if (responseFuture.isCancelled()) {
                        delegateResult.cancel(false);
                    }
                });
            }, backoffNanoseconds, TimeUnit.NANOSECONDS);
            return this.wrap(responseFuture);
        }

        private long getBackoffNanoseconds() {
            if (this.failures == 0) {
                return 0L;
            }
            int upperBound = (int)Math.pow(2.0, this.failures - 1);
            return Math.round((double)RetryingChannel.this.backoffSlotSize.toNanos() * RetryingChannel.this.jitter.getAsDouble() * (double)upperBound);
        }

        private boolean isRetryableQosStatus(Response response) {
            switch (RetryingChannel.this.serverQoS) {
                case AUTOMATIC_RETRY: {
                    return Responses.isQosStatus(response);
                }
                case PROPAGATE_429_and_503_TO_CALLER: {
                    return Responses.isQosStatus(response) && !Responses.isTooManyRequests(response) && !Responses.isUnavailable(response);
                }
            }
            throw new SafeIllegalStateException("Encountered unknown propagate QoS configuration", SafeArg.of("serverQoS", RetryingChannel.this.serverQoS));
        }

        private boolean shouldAttemptToRetry(Throwable throwable) {
            if (RetryingChannel.this.retryOnTimeout == ClientConfiguration.RetryOnTimeout.DISABLED) {
                if (throwable instanceof SocketTimeoutException) {
                    SocketTimeoutException socketTimeout = (SocketTimeoutException)throwable;
                    return socketTimeout.getMessage() != null && socketTimeout.getMessage().contains("connect timed out");
                }
                if (RetryingChannel.isEtimedoutException(throwable)) {
                    return false;
                }
            }
            return throwable instanceof IOException;
        }

        private void infoLogRetriesExhausted(Response response) {
            if (log.isInfoEnabled()) {
                SafeRuntimeException stacktrace = this.callsiteStacktrace.orElse(null);
                log.info("Exhausted {} retries, returning last received response with status {}, channel: {}, service: {}, endpoint: {}", SafeArg.of("retries", RetryingChannel.this.maxRetries), SafeArg.of("status", response.code()), SafeArg.of("channelName", RetryingChannel.this.channelName), SafeArg.of("serviceName", this.endpoint.serviceName()), SafeArg.of("endpoint", this.endpoint.endpointName()), stacktrace);
            }
        }

        private void infoLogRetry(long backoffNanoseconds, OptionalInt responseStatus, @Nullable Throwable throwable) {
            if (log.isInfoEnabled()) {
                log.info("Retrying call after failure {}/{} backoff: {}, channel: {}, service: {}, endpoint: {}, status: {}", SafeArg.of("failures", this.failures), SafeArg.of("maxRetries", RetryingChannel.this.maxRetries), SafeArg.of("backoffMillis", TimeUnit.NANOSECONDS.toMillis(backoffNanoseconds)), SafeArg.of("channelName", RetryingChannel.this.channelName), SafeArg.of("serviceName", this.endpoint.serviceName()), SafeArg.of("endpoint", this.endpoint.endpointName()), SafeArg.of("status", responseStatus.isPresent() ? Integer.valueOf(responseStatus.getAsInt()) : null), throwable);
            }
        }

        private String channelName() {
            return RetryingChannel.this.channelName;
        }
    }

    private static enum RetryingCallbackTranslator implements TagTranslator<RetryingCallback>
    {
        INSTANCE;


        @Override
        public <T> void translate(TagTranslator.TagAdapter<T> sink, T target, RetryingCallback data) {
            sink.tag(target, "serviceName", data.endpoint.serviceName());
            sink.tag(target, "endpointName", data.endpoint.endpointName());
            sink.tag(target, "failures", Integer.toString(data.failures));
            sink.tag(target, "channel", data.channelName());
        }
    }
}

