/*
 * Decompiled with CFR 0.152.
 */
package com.palantir.foundry.sql.query;

import com.palantir.foundry.sql.driver.logging.DriverLoggerFactory;
import com.palantir.foundry.sql.query.ImmutableStream;
import com.palantir.logsafe.Arg;
import com.palantir.logsafe.DoNotLog;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.slf4j.Logger;
import shadow.palantir.driver.com.google.common.io.ByteStreams;
import shadow.palantir.driver.com.palantir.common.streams.MoreStreams;
import shadow.palantir.driver.com.palantir.conjure.java.lib.SafeLong;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.SqlQueryServiceV2Blocking;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.Ticket;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.TicketInfo;
import shadow.palantir.driver.com.palantir.tokens.auth.AuthHeader;
import shadow.palantir.driver.com.palantir.tracing.DeferredTracer;
import shadow.palantir.driver.org.apache.commons.io.IOUtils;
import shadow.palantir.driver.org.apache.commons.io.input.AutoCloseInputStream;
import shadow.palantir.driver.org.immutables.value.Value;

final class TicketDownloader
implements AutoCloseable {
    private static final Logger log = DriverLoggerFactory.getLogger(TicketDownloader.class);
    private static final int RETRY_COUNT = 3;
    private static final Duration RETRY_SLEEP_DURATION = Duration.ofSeconds(3L);
    private final Supplier<AuthHeader> authHeader;
    private final SqlQueryServiceV2Blocking client;
    private final boolean compress;
    private final int maxStreamSizeToBuffer;
    private final DeferredTracer queryTrace;
    private Iterator<StreamLoader> ticketStreams;
    private StreamLoader current = null;

    TicketDownloader(Supplier<AuthHeader> authHeader, SqlQueryServiceV2Blocking client, Executor bufferExecutor, List<TicketInfo> tickets, boolean compress, Optional<Integer> streamsToBuffer, Optional<Integer> maxStreamSizeToBuffer) {
        this.authHeader = authHeader;
        this.client = client;
        this.compress = compress;
        this.maxStreamSizeToBuffer = maxStreamSizeToBuffer.orElse(Integer.MAX_VALUE);
        this.queryTrace = new DeferredTracer("foundry-sql-driver: load-stream");
        this.ticketStreams = streamsToBuffer.isPresent() && streamsToBuffer.get() > 0 && tickets.size() > 1 ? MoreStreams.blockingStreamWithParallelism(tickets.stream(), this::bufferStreamIfWithinLimit, bufferExecutor, streamsToBuffer.get()).iterator() : tickets.stream().map(this::getStreamSupplier).iterator();
    }

    boolean next() {
        if (Preconditions.checkArgumentNotNull(this.ticketStreams).hasNext()) {
            this.current = this.ticketStreams.next();
            return true;
        }
        return false;
    }

    Stream loadCurrent(SafeLong row) {
        return this.queryTrace.withTrace(() -> this.current.load(row));
    }

    private StreamLoader bufferStreamIfWithinLimit(TicketInfo ticket) {
        if (ticket.getEstimatedSize().isPresent() && ticket.getEstimatedSize().get().longValue() < (long)this.maxStreamSizeToBuffer) {
            return this.bufferStream(ticket);
        }
        return this.getStreamSupplier(ticket);
    }

    private StreamLoader bufferStream(TicketInfo ticket) {
        try {
            byte[] bytes = this.bufferStreamToBytes(ticket);
            return row -> {
                Preconditions.checkState(row.longValue() == 0L, "Buffered stream cannot be loaded at an offset");
                return Stream.of(ticket.getTicket(), new ByteArrayInputStream(bytes));
            };
        }
        catch (IOException e) {
            throw new SafeRuntimeException("Failed buffering stream", (Throwable)e, new Arg[0]);
        }
    }

    private byte[] bufferStreamToBytes(TicketInfo ticket) throws IOException {
        int attempt = 0;
        while (true) {
            byte[] byArray;
            block10: {
                ++attempt;
                InputStream stream = this.getStream(ticket, SafeLong.of(0L)).stream();
                try {
                    byte[] byteArray = ByteStreams.toByteArray(stream);
                    log.debug("Buffered stream: {}MB", (Object)SafeArg.of("mb", byteArray.length / 1000000));
                    byArray = byteArray;
                    if (stream == null) break block10;
                }
                catch (Throwable throwable) {
                    try {
                        if (stream != null) {
                            try {
                                stream.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (Exception e) {
                        if (attempt >= 3) {
                            throw e;
                        }
                        log.warn("Failed buffering stream. Will retry.", e);
                        TicketDownloader.sleep(RETRY_SLEEP_DURATION);
                        continue;
                    }
                }
                stream.close();
            }
            return byArray;
            break;
        }
    }

    private StreamLoader getStreamSupplier(TicketInfo ticket) {
        return row -> this.getStream(ticket, row);
    }

    private Stream getStream(TicketInfo ticket, SafeLong row) {
        InputStream stream = null;
        try {
            stream = this.client.getStream(this.authHeader.get(), Optional.of(this.compress), Optional.of(row), Optional.empty(), ticket.getTicket());
            return Stream.of(ticket.getTicket(), ((AutoCloseInputStream.Builder)AutoCloseInputStream.builder().setInputStream(stream)).get());
        }
        catch (IOException e) {
            IOUtils.closeQuietly(stream, e::addSuppressed);
            throw new SafeRuntimeException("Failed opening stream", (Throwable)e, new Arg[0]);
        }
    }

    private static void sleep(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        }
        catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void close() {
        this.current = null;
        this.ticketStreams = null;
    }

    @DoNotLog
    @Value.Immutable
    static interface Stream {
        @Value.Parameter
        public Ticket ticket();

        @Value.Parameter
        public InputStream stream();

        public static Stream of(Ticket ticket, InputStream stream) {
            return ImmutableStream.of(ticket, stream);
        }
    }

    @FunctionalInterface
    static interface StreamLoader {
        public Stream load(SafeLong var1);
    }
}

