/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.execution.statestore;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Optional;
import java.util.function.Supplier;
import lombok.Generated;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.session.SessionType;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.CopyBuilder;
import org.opensearch.sql.spark.execution.statestore.FromXContent;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;
import org.opensearch.sql.spark.execution.statestore.StateCopyBuilder;
import org.opensearch.sql.spark.execution.statestore.StateModel;
import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.IndexDMLResultXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.XContentSerializer;
import org.opensearch.sql.spark.execution.xcontent.XContentSerializerUtil;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.transport.client.Client;
import shaded.com.google.common.annotations.VisibleForTesting;

public class StateStore {
    public static final String SETTINGS_FILE_NAME = "query_execution_request_settings.yml";
    public static final String MAPPING_FILE_NAME = "query_execution_request_mapping.yml";
    public static final String ALL_DATASOURCE = "*";
    private static final Logger LOG = LogManager.getLogger();
    private final Client client;
    private final ClusterService clusterService;

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public <T extends StateModel> T create(String docId, T st, CopyBuilder<T> builder, String indexName) {
        try {
            if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
                this.createIndex(indexName);
            }
            XContentSerializer<T> serializer = this.getXContentSerializer(st);
            IndexRequest indexRequest = (IndexRequest)new IndexRequest(indexName).id(docId).source(serializer.toXContent(st, ToXContent.EMPTY_PARAMS)).setIfSeqNo(this.getSeqNo(st)).setIfPrimaryTerm(this.getPrimaryTerm(st)).create(true).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                IndexResponse indexResponse = (IndexResponse)this.client.index(indexRequest).actionGet();
                if (indexResponse.getResult().equals((Object)DocWriteResponse.Result.CREATED)) {
                    LOG.debug("Successfully created doc. id: {}", (Object)st.getId());
                    T stateModel = builder.of(st, XContentSerializerUtil.buildMetadata(indexResponse.getSeqNo(), indexResponse.getPrimaryTerm()));
                    return stateModel;
                }
                throw new RuntimeException(String.format(Locale.ROOT, "Failed create doc. id: %s, error: %s", st.getId(), indexResponse.getResult().getLowercase()));
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public <T extends StateModel> Optional<T> get(String sid, FromXContent<T> builder, String indexName) {
        try {
            if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
                this.createIndex(indexName);
                return Optional.empty();
            }
            GetRequest getRequest = ((GetRequest)new GetRequest().index(indexName)).id(sid).refresh(true);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                GetResponse getResponse = (GetResponse)this.client.get(getRequest).actionGet();
                if (getResponse.isExists()) {
                    XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString());
                    parser.nextToken();
                    Optional<T> optional2 = Optional.of(builder.fromXContent(parser, getResponse.getSeqNo(), getResponse.getPrimaryTerm()));
                    return optional2;
                }
                Optional optional = Optional.empty();
                return optional;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @VisibleForTesting
    public <T extends StateModel, S> T updateState(T st, S state, StateCopyBuilder<T, S> builder, String indexName) {
        try {
            T model = builder.of(st, state, st.getMetadata());
            XContentSerializer<T> serializer = this.getXContentSerializer(st);
            UpdateRequest updateRequest = ((UpdateRequest)new UpdateRequest().index(indexName)).id(((StateModel)model).getId()).setIfSeqNo(this.getSeqNo((StateModel)model)).setIfPrimaryTerm(this.getPrimaryTerm((StateModel)model)).doc(serializer.toXContent(model, ToXContent.EMPTY_PARAMS)).fetchSource(true).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                UpdateResponse updateResponse = (UpdateResponse)this.client.update(updateRequest).actionGet();
                LOG.debug("Successfully update doc. id: {}", (Object)st.getId());
                T t2 = builder.of(model, state, XContentSerializerUtil.buildMetadata(updateResponse.getSeqNo(), updateResponse.getPrimaryTerm()));
                return t2;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private long getSeqNo(StateModel model) {
        return model.getMetadataItem("seqNo", Long.class).orElse(-2L);
    }

    private long getPrimaryTerm(StateModel model) {
        return model.getMetadataItem("primaryTerm", Long.class).orElse(0L);
    }

    @VisibleForTesting
    public boolean delete(String sid, String indexName) {
        boolean bl;
        block9: {
            if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
                return true;
            }
            ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();
            try {
                DeleteRequest deleteRequest = new DeleteRequest(indexName, sid);
                DeleteResponse deleteResponse = (DeleteResponse)this.client.delete(deleteRequest).actionGet();
                boolean bl2 = bl = deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
                if (ignored == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (ignored != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    throw new RuntimeException(String.format("Failed to delete index state doc %s in index %s", sid, indexName), e);
                }
            }
            ignored.close();
        }
        return bl;
    }

    private void createIndex(String indexName) {
        try {
            ActionFuture createIndexResponseActionFuture;
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
            createIndexRequest.mapping(this.loadConfigFromResource(MAPPING_FILE_NAME), XContentType.YAML).settings(this.loadConfigFromResource(SETTINGS_FILE_NAME), XContentType.YAML);
            try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
                createIndexResponseActionFuture = this.client.admin().indices().create(createIndexRequest);
            }
            CreateIndexResponse createIndexResponse = (CreateIndexResponse)createIndexResponseActionFuture.actionGet();
            if (!createIndexResponse.isAcknowledged()) {
                throw new RuntimeException("Index creation is not acknowledged.");
            }
            LOG.info("Index: {} creation Acknowledged", (Object)indexName);
        }
        catch (Throwable e) {
            throw new RuntimeException("Internal server error while creating" + indexName + " index:: " + e.getMessage());
        }
    }

    @VisibleForTesting
    public long count(String indexName, QueryBuilder query) {
        ActionFuture searchResponseActionFuture;
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(query);
        searchSourceBuilder.size(0);
        SearchRequest searchRequest = new SearchRequest().indices(new String[]{indexName}).preference("_primary_first").source(searchSourceBuilder);
        try (ThreadContext.StoredContext ignored = this.client.threadPool().getThreadContext().stashContext();){
            searchResponseActionFuture = this.client.search(searchRequest);
        }
        SearchResponse searchResponse = (SearchResponse)searchResponseActionFuture.actionGet();
        if (searchResponse.status().getStatus() != 200) {
            throw new RuntimeException("Fetching job metadata information failed with status : " + String.valueOf(searchResponse.status()));
        }
        return searchResponse.getHits().getTotalHits().value();
    }

    private String loadConfigFromResource(String fileName) throws IOException {
        InputStream fileStream = StateStore.class.getClassLoader().getResourceAsStream(fileName);
        return IOUtils.toString(fileStream, StandardCharsets.UTF_8);
    }

    public static Supplier<Long> activeSessionsCount(StateStore stateStore, String datasourceName) {
        return () -> stateStore.count(OpenSearchStateStoreUtil.getIndexName(datasourceName), (QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.termQuery((String)"type", (String)"session")).must((QueryBuilder)QueryBuilders.termQuery((String)"sessionType", (String)SessionType.INTERACTIVE.getSessionType())).must((QueryBuilder)QueryBuilders.termQuery((String)"state", (String)SessionState.RUNNING.getSessionState())));
    }

    public static Supplier<Long> activeRefreshJobCount(StateStore stateStore, String datasourceName) {
        return () -> stateStore.count(OpenSearchStateStoreUtil.getIndexName(datasourceName), (QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.termQuery((String)"type", (String)"flintindexstate")).must((QueryBuilder)QueryBuilders.termQuery((String)"state", (String)FlintIndexState.REFRESHING.getState())));
    }

    public static Supplier<Long> activeStatementsCount(StateStore stateStore, String datasourceName) {
        return () -> stateStore.count(OpenSearchStateStoreUtil.getIndexName(datasourceName), (QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.termQuery((String)"type", (String)"statement")).should((QueryBuilder)QueryBuilders.termsQuery((String)"state", (String[])new String[]{StatementState.RUNNING.getState(), StatementState.WAITING.getState()})));
    }

    private <T extends StateModel> XContentSerializer<T> getXContentSerializer(T st) {
        if (st instanceof StatementModel) {
            return new StatementModelXContentSerializer();
        }
        if (st instanceof SessionModel) {
            return new SessionModelXContentSerializer();
        }
        if (st instanceof FlintIndexStateModel) {
            return new FlintIndexStateModelXContentSerializer();
        }
        if (st instanceof AsyncQueryJobMetadata) {
            return new AsyncQueryJobMetadataXContentSerializer();
        }
        if (st instanceof IndexDMLResult) {
            return new IndexDMLResultXContentSerializer();
        }
        throw new IllegalArgumentException("Unsupported StateModel subclass: " + st.getClass().getSimpleName());
    }

    @Generated
    public StateStore(Client client, ClusterService clusterService) {
        this.client = client;
        this.clusterService = clusterService;
    }
}

