/*
 * Decompiled with CFR 0.152.
 */
package com.palantir.foundry.sql.driver.statement;

import com.palantir.foundry.sql.driver.exception.ExceptionUtils;
import com.palantir.foundry.sql.driver.logging.Args;
import com.palantir.foundry.sql.driver.logging.DriverLoggerFactory;
import com.palantir.foundry.sql.driver.statement.StreamResponse;
import com.palantir.foundry.sql.driver.statement.StreamToken;
import com.palantir.logsafe.SafeArg;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.Function;
import org.slf4j.Logger;
import shadow.palantir.driver.com.google.common.annotations.VisibleForTesting;
import shadow.palantir.driver.com.google.common.io.CountingInputStream;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.QueryId;
import shadow.palantir.driver.com.palantir.tracing.DeferredTracer;
import shadow.palantir.driver.org.apache.commons.io.IOUtils;

public final class ReloadingResultStream
extends InputStream {
    private static final Logger log = DriverLoggerFactory.getLogger(ReloadingResultStream.class);
    private static final int INITIAL_BUFFER_SIZE = 64000;
    private final Function<Optional<StreamToken>, StreamResponse> streamLoader;
    private final QueryId queryId;
    private final DeferredTracer trace;
    private Optional<StreamToken> currentToken = Optional.empty();
    private CountingInputStream currentDelegate;

    public ReloadingResultStream(Function<Optional<StreamToken>, StreamResponse> streamLoader, QueryId queryId, DeferredTracer trace) {
        this(streamLoader, queryId, 64000, trace);
    }

    @VisibleForTesting
    ReloadingResultStream(Function<Optional<StreamToken>, StreamResponse> streamLoader, QueryId queryId, int initialBufferSize, DeferredTracer trace) {
        this.streamLoader = streamLoader;
        this.queryId = queryId;
        this.trace = trace;
        byte[] initialBuffer = new byte[initialBufferSize];
        try {
            this.reloadStream(Optional.empty());
            int read = IOUtils.read(this.currentDelegate, initialBuffer);
            initialBuffer = ReloadingResultStream.trim(initialBuffer, read);
            this.currentDelegate = new CountingInputStream(new SequenceInputStream(new ByteArrayInputStream(initialBuffer), this.currentDelegate));
        }
        catch (IOException e) {
            log.warn("Initial stream load failed, restarting: {}", (Object)Args.queryId(queryId), (Object)e);
            this.reloadStream(Optional.empty());
        }
    }

    @Override
    public int read() throws IOException {
        try {
            return this.currentDelegate.read();
        }
        catch (IOException e) {
            this.handleExceptionAndReloadAtOffset(e);
            return this.currentDelegate.read();
        }
    }

    @Override
    public int read(byte[] bytes) throws IOException {
        return this.read(bytes, 0, bytes.length);
    }

    @Override
    public int read(byte[] bytes, int off, int len) throws IOException {
        try {
            return this.currentDelegate.read(bytes, off, len);
        }
        catch (IOException e) {
            this.handleExceptionAndReloadAtOffset(e);
            return this.currentDelegate.read(bytes, off, len);
        }
    }

    @Override
    public void close() throws IOException {
        IOUtils.close((Closeable)this.currentDelegate);
    }

    private void handleExceptionAndReloadAtOffset(IOException exception) {
        if (!this.currentToken.isPresent()) {
            log.warn("Unable to reload due to missing stream token: {}", (Object)Args.queryId(this.queryId));
            ExceptionUtils.throwUnchecked(exception);
        }
        long currentOffset = this.currentToken.get().streamOffset() + this.currentDelegate.getCount();
        log.warn("Caught an exception whilst streaming results. Reloading stream at offset `{}`: {}", SafeArg.of("offset", currentOffset), Args.queryId(this.queryId), exception);
        Optional<StreamToken> reloadToken = this.currentToken.map(token -> token.atOffset(currentOffset));
        try {
            this.reloadStream(reloadToken);
        }
        catch (Exception e) {
            log.warn("Caught an exception whilst reloading stream", e);
            ExceptionUtils.throwUnchecked(exception);
        }
    }

    private void reloadStream(Optional<StreamToken> streamToken) {
        IOUtils.closeQuietly(this.currentDelegate);
        StreamResponse response = this.trace.withTrace(() -> this.streamLoader.apply(streamToken));
        this.currentToken = response.token();
        this.currentDelegate = new CountingInputStream(response.stream());
    }

    private static byte[] trim(byte[] bytes, int length) {
        if (bytes.length == length) {
            return bytes;
        }
        return Arrays.copyOfRange(bytes, 0, length);
    }
}

