/*
 * Decompiled with CFR 0.152.
 */
package com.palantir.foundry.sql.driver.statement;

import com.palantir.foundry.sql.api.SqlQueryServiceRetryableDriverClient;
import com.palantir.foundry.sql.collection.utils.AsyncCloseInputStream;
import com.palantir.foundry.sql.collection.utils.ReadDelayInputStream;
import com.palantir.foundry.sql.driver.connection.ConnectionDetails;
import com.palantir.foundry.sql.driver.exception.ExceptionUtils;
import com.palantir.foundry.sql.driver.exception.UncheckedSqlException;
import com.palantir.foundry.sql.driver.logging.Args;
import com.palantir.foundry.sql.driver.logging.DriverLoggerFactory;
import com.palantir.foundry.sql.driver.logging.RemoteLogger;
import com.palantir.foundry.sql.driver.results.AutoCloseableIterator;
import com.palantir.foundry.sql.driver.results.LatitudeDriverRow;
import com.palantir.foundry.sql.driver.schema.SchemaConverter;
import com.palantir.foundry.sql.driver.schema.TimezoneConverter;
import com.palantir.foundry.sql.driver.statement.ExecutedQuery;
import com.palantir.foundry.sql.driver.statement.FriendlyExceptionResultIterator;
import com.palantir.foundry.sql.driver.statement.QueryManager;
import com.palantir.foundry.sql.driver.statement.QueryResult;
import com.palantir.foundry.sql.driver.statement.ReloadingResultStream;
import com.palantir.foundry.sql.driver.statement.StreamResponse;
import com.palantir.foundry.sql.driver.statement.StreamToken;
import com.palantir.logsafe.Arg;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import java.io.InputStream;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import latitude.api.results.LatitudeResult;
import org.slf4j.Logger;
import shadow.palantir.driver.com.google.common.annotations.VisibleForTesting;
import shadow.palantir.driver.com.google.common.base.Stopwatch;
import shadow.palantir.driver.com.google.common.base.Ticker;
import shadow.palantir.driver.com.google.common.collect.Iterables;
import shadow.palantir.driver.com.google.common.io.CountingInputStream;
import shadow.palantir.driver.com.google.common.util.concurrent.ListeningScheduledExecutorService;
import shadow.palantir.driver.com.google.common.util.concurrent.MoreExecutors;
import shadow.palantir.driver.com.google.common.util.concurrent.ThreadFactoryBuilder;
import shadow.palantir.driver.com.palantir.contour.ipc.LatitudeResultDeserializer;
import shadow.palantir.driver.com.palantir.contour.ipc.LatitudeResultWrapper;
import shadow.palantir.driver.com.palantir.contour.ipc.LatitudeRow;
import shadow.palantir.driver.com.palantir.contour.ipc.StreamingTableLatitudeResult;
import shadow.palantir.driver.com.palantir.dialogue.Response;
import shadow.palantir.driver.com.palantir.foundry.schemas.api.types.FoundryFieldSchema;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.CanceledQueryStatus;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.FailedQueryStatus;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.Parameters;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.QueryId;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.QueryStatus;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.ReadyQueryStatus;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.RunningQueryStatus;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.SqlDescribeRequest;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.SqlDialect;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.SqlExecuteRequest;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.SqlExecuteResponse;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.SqlQuery;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.SqlQueryServiceAsync;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.SqlQueryServiceBlocking;
import shadow.palantir.driver.com.palantir.foundrysqlserver.com.palantir.foundry.sql.api.TimeoutInMillis;
import shadow.palantir.driver.com.palantir.tokens.auth.AuthHeader;
import shadow.palantir.driver.com.palantir.tracing.DeferredTracer;
import shadow.palantir.driver.com.palantir.tracing.Tracer;
import shadow.palantir.driver.one.util.streamex.StreamEx;
import shadow.palantir.driver.org.apache.commons.io.IOUtils;
import shadow.palantir.driver.org.apache.commons.io.input.AutoCloseInputStream;

public final class DefaultQueryManager
implements QueryManager {
    private static final Logger log = DriverLoggerFactory.getLogger(DefaultQueryManager.class);
    private final ConnectionDetails connection;
    private final SqlQueryServiceBlocking queryService;
    private final SqlQueryServiceAsync queryServiceAsync;
    private final SqlQueryServiceRetryableDriverClient queryServiceRetryable;
    private final RemoteLogger remoteLogger;
    private final ListeningScheduledExecutorService resultCloseExecutor;
    private final TimezoneConverter timezoneConverter;

    public DefaultQueryManager(ConnectionDetails connection, SqlQueryServiceBlocking queryService, SqlQueryServiceAsync queryServiceAsync, SqlQueryServiceRetryableDriverClient queryServiceRetryable, RemoteLogger remoteLogger, TimezoneConverter timezoneConverter) {
        this(connection, queryService, queryServiceAsync, queryServiceRetryable, remoteLogger, MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(8, new ThreadFactoryBuilder().setDaemon(true).build())), timezoneConverter);
    }

    @VisibleForTesting
    DefaultQueryManager(ConnectionDetails connection, SqlQueryServiceBlocking queryService, SqlQueryServiceAsync queryServiceAsync, SqlQueryServiceRetryableDriverClient queryServiceRetryable, RemoteLogger remoteLogger, ListeningScheduledExecutorService resultCloseExecutor, TimezoneConverter timezoneConverter) {
        this.connection = connection;
        this.queryService = queryService;
        this.queryServiceAsync = queryServiceAsync;
        this.queryServiceRetryable = queryServiceRetryable;
        this.remoteLogger = remoteLogger;
        this.resultCloseExecutor = resultCloseExecutor;
        this.timezoneConverter = timezoneConverter;
    }

    @Override
    public List<FoundryFieldSchema> describeQuery(Supplier<AuthHeader> authHeader, String query, SqlDialect sqlDialect, Parameters inputParameters) {
        log.info("Connection {} - Describing query. Parameters: {} Query: {}", Args.connectionId(this.connection.id()), UnsafeArg.of("parameters", inputParameters), UnsafeArg.of("query", query));
        SqlDescribeRequest describeRequest = SqlDescribeRequest.builder().query(SqlQuery.of(query)).dialect(sqlDialect).parameters(Optional.of(inputParameters)).fallbackBranchIds(this.connection.fallbackBranchIds()).build();
        return SchemaConverter.INSTANCE.toFriendlySchema(this.queryService.describe(authHeader.get(), describeRequest).getSchema());
    }

    @Override
    public QueryResult query(Supplier<AuthHeader> authHeader, String query, SqlDialect sqlDialect, Parameters inputParameters) throws SQLException {
        ExecutedQuery executedQuery = this.executeQuery(authHeader, query, sqlDialect, inputParameters);
        return this.getResults(authHeader, executedQuery);
    }

    @VisibleForTesting
    ExecutedQuery executeQuery(Supplier<AuthHeader> authHeader, String query, SqlDialect sqlDialect, Parameters inputParameters) {
        log.info("Connection {} - Executing query. Parameters: {} Query: {}", Args.connectionId(this.connection.id()), UnsafeArg.of("parameters", inputParameters), UnsafeArg.of("query", query));
        Stopwatch stopwatch = Stopwatch.createStarted();
        SqlExecuteRequest executeRequest = SqlExecuteRequest.builder().query(SqlQuery.of(query)).dialect(sqlDialect).serializationProtocol(this.connection.serializationProtocol()).parameters(Optional.of(inputParameters)).fallbackBranchIds(this.connection.fallbackBranchIds()).timeout(this.connection.queryTimeoutMillis().map(TimeoutInMillis::of)).build();
        SqlExecuteResponse response = this.queryService.execute(authHeader.get(), executeRequest);
        log.info("Query executed: {}. Took: {}", (Object)Args.queryId(response.getQueryId()), (Object)SafeArg.of("time", stopwatch.stop()));
        return ExecutedQuery.of(response.getQueryId(), response.getStatus());
    }

    @VisibleForTesting
    QueryResult getResults(Supplier<AuthHeader> authHeader, ExecutedQuery query) throws SQLException {
        log.info("Connection {} - Getting query results: {}", (Object)Args.connectionId(this.connection.id()), (Object)Args.queryId(query.queryId()));
        Stopwatch readyStopwatch = Stopwatch.createStarted();
        QueryStatus status = query.initialStatus();
        block8: while (true) {
            try {
                SimpleStatus simpleStatus = status.accept(StatusSimplifier.INSTANCE);
                switch (simpleStatus) {
                    case AWAITING_RESULT: {
                        break;
                    }
                    case READY: {
                        break block8;
                    }
                }
            }
            catch (UncheckedSqlException e) {
                log.error("Executing query failed", e);
                throw e.toSqlException();
            }
            status = this.queryService.getStatus(authHeader.get(), query.queryId()).getStatus();
        }
        log.info("Connection {} - Query results ready: {}. Took: {}", Args.connectionId(this.connection.id()), Args.queryId(query.queryId()), SafeArg.of("duration", readyStopwatch.stop()));
        Stopwatch fetchStopwatch = Stopwatch.createStarted();
        InputStream resultStream = null;
        Preconditions.checkState(Tracer.hasTraceId(), "Statement trace has not been set");
        Function<InputStream, InputStream> networkStreamWrapper = this.networkStreamWrapper(this.resultCloseExecutor, this.connection.failOnReadDelay(), Tracer.getTraceId());
        try {
            resultStream = this.connection.reloadableStreams() ? new ReloadingResultStream(this.resultStreamLoader(authHeader, query.queryId(), networkStreamWrapper), query.queryId(), new DeferredTracer()) : networkStreamWrapper.apply(this.queryService.getResults(authHeader.get(), query.queryId()));
            CountingInputStream countingResultStream = new CountingInputStream(new AutoCloseInputStream(resultStream));
            log.info("Connection {} - Got result stream for {}. Took: {} Total time from opening: {} s", Args.connectionId(this.connection.id()), Args.queryId(query.queryId()), SafeArg.of("duration", fetchStopwatch.stop()), SafeArg.of("timeSincePrepare", Float.valueOf((float)query.executeTime().until(Instant.now(), ChronoUnit.MILLIS) / 1000.0f)));
            StreamingTableLatitudeResult latitudeResult = LatitudeResultDeserializer.INSTANCE.deserialize(countingResultStream).accept(AsStreamingTableLatitudeResult.INSTANCE);
            log.info("Connection {} - Deserialized result stream: {}. Starting to fetch {} rows", Args.connectionId(this.connection.id()), Args.queryId(query.queryId()), SafeArg.of("rowCount", latitudeResult.getRowCount()), SafeArg.of("byteCount", countingResultStream.getCount()));
            List<FoundryFieldSchema> columnTypesWithNames = StreamEx.zip(latitudeResult.getColumns(), latitudeResult.getColumnTypes(), (name, columnType) -> FoundryFieldSchema.builder().from((FoundryFieldSchema)columnType).name((String)name).build()).toImmutableList();
            List<FoundryFieldSchema> friendlySchema = SchemaConverter.INSTANCE.toFriendlySchema(columnTypesWithNames);
            Iterator rowIterator = Iterables.transform(latitudeResult::rowIterator, row -> new LatitudeDriverRow(friendlySchema, (LatitudeRow)row, this.timezoneConverter)).iterator();
            FriendlyExceptionResultIterator errorHandlingRowIterator = new FriendlyExceptionResultIterator(rowIterator, Tracer.getTraceId(), this.connection, query.queryId(), this.remoteLogger, countingResultStream::getCount);
            return QueryResult.builder().queryId(query.queryId()).rowCount(latitudeResult.getRowCount()).columnTypes(friendlySchema).rowIterator(AutoCloseableIterator.wrap(errorHandlingRowIterator)).build();
        }
        catch (Throwable e) {
            log.error("Connection {} - Error fetching results: {}", Args.connectionId(this.connection.id()), Args.queryId(query.queryId()), e);
            IOUtils.closeQuietly(resultStream);
            throw (RuntimeException)ExceptionUtils.throwUnchecked(e);
        }
    }

    @Override
    public void cancel(Supplier<AuthHeader> authHeader, QueryId query) {
        log.info("Connection {} - Cancelling query: {}", (Object)Args.connectionId(this.connection.id()), (Object)Args.queryId(query));
        this.queryServiceAsync.cancel(authHeader.get(), query);
    }

    private Function<Optional<StreamToken>, StreamResponse> resultStreamLoader(Supplier<AuthHeader> authHeader, QueryId queryId, Function<InputStream, InputStream> streamWrapper) {
        return streamToken -> {
            Response response = this.queryServiceRetryable.getResults((AuthHeader)authHeader.get(), queryId, streamToken.map(StreamToken::serialize));
            return StreamResponse.of(response, streamWrapper);
        };
    }

    private Function<InputStream, InputStream> networkStreamWrapper(ListeningScheduledExecutorService streamCloseExecutor, Optional<Duration> failOnReadDelay, String traceId) {
        return in -> new ReadDelayInputStream(new AsyncCloseInputStream((InputStream)in, streamCloseExecutor), failOnReadDelay, Ticker.systemTicker(), this.remoteLogger, traceId);
    }

    private static enum StatusSimplifier implements QueryStatus.Visitor<SimpleStatus>
    {
        INSTANCE;


        @Override
        public SimpleStatus visitCanceled(CanceledQueryStatus _canceledQueryStatus) {
            throw new UncheckedSqlException("Executing query failed. Statement was already cancelled");
        }

        @Override
        public SimpleStatus visitFailed(FailedQueryStatus failedQueryStatus) {
            String message = String.format("Executing query failed. %s - %s", failedQueryStatus.getFailureReason(), failedQueryStatus.getErrorMessage().orElse(""));
            throw new UncheckedSqlException(message);
        }

        @Override
        public SimpleStatus visitReady(ReadyQueryStatus _readyQueryStatus) {
            return SimpleStatus.READY;
        }

        @Override
        public SimpleStatus visitRunning(RunningQueryStatus _runningQueryStatus) {
            return SimpleStatus.AWAITING_RESULT;
        }

        @Override
        public SimpleStatus visitUnknown(String _status) {
            throw new UncheckedSqlException("Executing query failed. Statement was in unknown state");
        }
    }

    private static enum AsStreamingTableLatitudeResult implements LatitudeResultWrapper.Visitor<StreamingTableLatitudeResult>
    {
        INSTANCE;


        @Override
        public StreamingTableLatitudeResult visitLatitudeResult(LatitudeResult _latitudeResult) {
            throw new SafeIllegalStateException("Deserialized result was not an instance of StreamingTableLatitudeResult", new Arg[0]);
        }

        @Override
        public StreamingTableLatitudeResult visitStreamingTableLatitudeResult(StreamingTableLatitudeResult streamingTableLatitudeResult) {
            return streamingTableLatitudeResult;
        }
    }

    private static enum SimpleStatus {
        AWAITING_RESULT,
        READY;

    }
}

