/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.timeseries.ratelimit;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.ExpiringState;
import org.opensearch.timeseries.MaintenanceState;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.breaker.CircuitBreakerService;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.ratelimit.QueuedRequest;
import org.opensearch.timeseries.ratelimit.RequestPriority;
import org.opensearch.timeseries.settings.TimeSeriesSettings;

public abstract class RateLimitedRequestWorker<RequestType extends QueuedRequest>
implements MaintenanceState {
    private static final Logger LOG = LogManager.getLogger(RateLimitedRequestWorker.class);
    protected volatile int queueSize;
    protected final String workerName;
    private final long heapSize;
    private final int singleRequestSize;
    private float maxHeapPercentForQueue;
    protected final ConcurrentSkipListMap<String, RequestQueue> requestQueues;
    private String lastSelectedRequestQueueId;
    protected Random random;
    private CircuitBreakerService circuitBreakerService;
    protected ThreadPool threadPool;
    protected String threadPoolName;
    protected Instant cooldownStart;
    protected int coolDownMinutes;
    private float maxQueuedTaskRatio;
    protected Clock clock;
    private float mediumRequestQueuePruneRatio;
    private float lowRequestQueuePruneRatio;
    protected int maintenanceFreqConstant;
    private final Duration stateTtl;
    protected final NodeStateManager nodeStateManager;
    protected final AnalysisType context;

    public RateLimitedRequestWorker(String workerName, long heapSizeInBytes, int singleRequestSizeInBytes, Setting<Float> maxHeapPercentForQueueSetting, ClusterService clusterService, Random random, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, String threadPoolName, Settings settings, float maxQueuedTaskRatio, Clock clock, float mediumRequestQueuePruneRatio, float lowRequestQueuePruneRatio, int maintenanceFreqConstant, Duration stateTtl, NodeStateManager nodeStateManager, AnalysisType context) {
        this.heapSize = heapSizeInBytes;
        this.singleRequestSize = singleRequestSizeInBytes;
        this.maxHeapPercentForQueue = ((Float)maxHeapPercentForQueueSetting.get(settings)).floatValue();
        this.queueSize = (int)((float)heapSizeInBytes * this.maxHeapPercentForQueue / (float)singleRequestSizeInBytes);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(maxHeapPercentForQueueSetting, it -> {
            int oldQueueSize = this.queueSize;
            this.maxHeapPercentForQueue = it.floatValue();
            this.queueSize = (int)((float)this.heapSize * this.maxHeapPercentForQueue / (float)this.singleRequestSize);
            LOG.info((Message)new ParameterizedMessage("Queue size changed from [{}] to [{}]", (Object)oldQueueSize, (Object)this.queueSize));
        });
        this.workerName = workerName;
        this.random = random;
        this.circuitBreakerService = circuitBreakerService;
        this.threadPool = threadPool;
        this.threadPoolName = threadPoolName;
        this.maxQueuedTaskRatio = maxQueuedTaskRatio;
        this.clock = clock;
        this.mediumRequestQueuePruneRatio = mediumRequestQueuePruneRatio;
        this.lowRequestQueuePruneRatio = lowRequestQueuePruneRatio;
        this.lastSelectedRequestQueueId = null;
        this.requestQueues = new ConcurrentSkipListMap();
        this.cooldownStart = Instant.MIN;
        this.coolDownMinutes = (int)((TimeValue)TimeSeriesSettings.COOLDOWN_MINUTES.get(settings)).getMinutes();
        this.maintenanceFreqConstant = maintenanceFreqConstant;
        this.stateTtl = stateTtl;
        this.nodeStateManager = nodeStateManager;
        this.context = context;
    }

    public String getWorkerName() {
        return this.workerName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Optional<BlockingQueue<RequestType>> selectNextQueue() {
        if (this.requestQueues.isEmpty()) {
            return Optional.empty();
        }
        String startId = this.lastSelectedRequestQueueId;
        try {
            Optional<BlockingQueue<RequestType>> optional;
            for (int i = 0; i < this.requestQueues.size(); ++i) {
                RequestQueue requestQueue;
                if ((startId = startId == null || this.requestQueues.size() == 1 || startId.equals(this.requestQueues.lastKey()) ? this.requestQueues.firstKey() : this.requestQueues.higherKey(startId)).equals(RequestPriority.LOW.name()) || (requestQueue = this.requestQueues.get(startId)) == null) continue;
                requestQueue.clearExpiredRequests();
                if (requestQueue.isEmpty()) continue;
                Optional optional2 = Optional.of(requestQueue.content);
                return optional2;
            }
            RequestQueue requestQueue = this.requestQueues.get(RequestPriority.LOW.name());
            if (requestQueue != null) {
                requestQueue.clearExpiredRequests();
                if (!requestQueue.isEmpty()) {
                    optional = Optional.of(requestQueue.content);
                    return optional;
                }
            }
            optional = Optional.empty();
            return optional;
        }
        finally {
            this.lastSelectedRequestQueueId = startId;
        }
    }

    protected void putOnly(RequestType request) {
        try {
            RequestQueue requestQueue = this.requestQueues.computeIfAbsent(RequestPriority.MEDIUM == ((QueuedRequest)request).getPriority() ? ((QueuedRequest)request).getConfigId() : ((QueuedRequest)request).getPriority().name(), k -> new RequestQueue());
            requestQueue.lastAccessTime = this.clock.instant();
            requestQueue.put(request);
        }
        catch (Exception e) {
            LOG.error((Message)new ParameterizedMessage("Failed to add requests to [{}]", (Object)this.workerName), (Throwable)e);
        }
    }

    private void maintainForThreadPool() {
        for (ThreadPoolStats.Stats stats : this.threadPool.stats()) {
            int maxQueueSize;
            String name = stats.getName();
            if (!"search".equals(name) && !"get".equals(name) && !"write".equals(name) || (maxQueueSize = (int)(this.maxQueuedTaskRatio * (float)this.threadPool.info(name).getQueueSize().singles())) <= 0 || stats.getQueue() <= maxQueueSize) continue;
            LOG.info((Message)new ParameterizedMessage("Queue [{}] size [{}], reached limit [{}]", new Object[]{name, stats.getQueue(), maxQueueSize}));
            this.setCoolDownStart();
            break;
        }
    }

    private void prune(Map<String, RequestQueue> requestQueues) {
        this.pruneExpired();
        for (Map.Entry<String, RequestQueue> requestQueueEntry : requestQueues.entrySet()) {
            RequestQueue requestQueue;
            if (requestQueueEntry.getKey().equals(RequestPriority.HIGH.name()) || (requestQueue = requestQueueEntry.getValue()) == null || requestQueue.isEmpty()) continue;
            if (requestQueueEntry.getKey().equals(RequestPriority.LOW.name())) {
                requestQueue.drain(this.lowRequestQueuePruneRatio);
                continue;
            }
            requestQueue.drain(this.mediumRequestQueuePruneRatio);
        }
    }

    private int pruneExpired() {
        int deleted = 0;
        for (Map.Entry<String, RequestQueue> requestQueueEntry : this.requestQueues.entrySet()) {
            RequestQueue requestQueue = requestQueueEntry.getValue();
            if (requestQueue == null) continue;
            deleted += requestQueue.clearExpiredRequests();
        }
        return deleted;
    }

    private void prune(Map<String, RequestQueue> requestQueues, int exceededSize) {
        int leftItemsToRemove = exceededSize - this.pruneExpired();
        if (leftItemsToRemove <= 0) {
            return;
        }
        int numberOfQueuesToExclude = 0;
        RequestQueue requestQueue = requestQueues.get(RequestPriority.LOW.name());
        if (requestQueue != null) {
            int removedFromLow = requestQueue.drain(leftItemsToRemove);
            if (removedFromLow >= leftItemsToRemove) {
                return;
            }
            ++numberOfQueuesToExclude;
            leftItemsToRemove -= removedFromLow;
        }
        if (requestQueues.get(RequestPriority.HIGH.name()) != null) {
            ++numberOfQueuesToExclude;
        }
        int numberOfRequestsToRemoveInMediumQueues = leftItemsToRemove / (requestQueues.size() - numberOfQueuesToExclude);
        for (Map.Entry<String, RequestQueue> requestQueueEntry : requestQueues.entrySet()) {
            if (requestQueueEntry.getKey().equals(RequestPriority.HIGH.name()) || requestQueueEntry.getKey().equals(RequestPriority.LOW.name()) || (requestQueue = requestQueueEntry.getValue()) == null) continue;
            requestQueue.drain(numberOfRequestsToRemoveInMediumQueues);
        }
    }

    private void maintainForMemory() {
        this.maintenance(this.requestQueues, this.stateTtl);
        int exceededSize = this.exceededSize();
        if (exceededSize > 0) {
            this.prune(this.requestQueues, exceededSize);
        } else if (this.circuitBreakerService.isOpen().booleanValue()) {
            this.prune(this.requestQueues);
        }
    }

    private int exceededSize() {
        Collection<RequestQueue> queues = this.requestQueues.values();
        int totalSize = 0;
        for (RequestQueue q : queues) {
            totalSize += q.size();
        }
        return totalSize - this.queueSize;
    }

    public boolean isQueueEmpty() {
        Collection<RequestQueue> queues = this.requestQueues.values();
        for (RequestQueue q : queues) {
            if (q.size() <= 0) continue;
            return false;
        }
        return true;
    }

    @Override
    public void maintenance() {
        try {
            this.maintainForMemory();
            this.maintainForThreadPool();
        }
        catch (Exception e) {
            LOG.warn("Failed to maintain", (Throwable)e);
        }
    }

    protected void setCoolDownStart() {
        this.cooldownStart = this.clock.instant();
    }

    protected List<RequestType> getRequests(int batchSize) {
        BlockingQueue<RequestType> nextToProcess;
        Optional<BlockingQueue<RequestType>> queue;
        ArrayList toProcess = new ArrayList(batchSize);
        HashSet<BlockingQueue<RequestType>> selectedQueue = new HashSet<BlockingQueue<RequestType>>();
        while (toProcess.size() < batchSize && (queue = this.selectNextQueue()).isPresent() && !selectedQueue.contains(nextToProcess = queue.get())) {
            selectedQueue.add(nextToProcess);
            ArrayList requests = new ArrayList();
            nextToProcess.drainTo(requests, batchSize);
            toProcess.addAll(requests);
        }
        return toProcess;
    }

    public void put(RequestType request) {
        if (request == null) {
            return;
        }
        this.putOnly(request);
        this.process();
    }

    public void putAll(List<RequestType> requests) {
        if (requests == null || requests.isEmpty()) {
            return;
        }
        try {
            for (QueuedRequest request : requests) {
                this.putOnly(request);
            }
            this.process();
        }
        catch (Exception e) {
            LOG.error((Message)new ParameterizedMessage("Failed to add requests to [{}]", (Object)this.getWorkerName()), (Throwable)e);
        }
    }

    protected void process() {
        block5: {
            if (this.random.nextInt(this.maintenanceFreqConstant) == 1) {
                this.maintenance();
            }
            if (this.cooldownStart.plus(Duration.ofMinutes(this.coolDownMinutes)).isAfter(this.clock.instant())) {
                this.threadPool.schedule(() -> {
                    try {
                        this.process();
                    }
                    catch (Exception e) {
                        LOG.error((Message)new ParameterizedMessage("Fail to process requests in [{}].", (Object)this.workerName), (Throwable)e);
                    }
                }, new TimeValue((long)this.coolDownMinutes, TimeUnit.MINUTES), this.threadPoolName);
            } else {
                try {
                    this.triggerProcess();
                }
                catch (Exception e) {
                    LOG.error(String.format(Locale.ROOT, "Failed to process requests from %s", this.getWorkerName()), (Throwable)e);
                    if (e == null || !(e instanceof TimeSeriesException)) break block5;
                    TimeSeriesException adExep = (TimeSeriesException)e;
                    this.nodeStateManager.setException(adExep.getConfigId(), adExep);
                }
            }
        }
    }

    public boolean hasConfigId(String configId) {
        for (Map.Entry<String, RequestQueue> requestQueueEntry : this.requestQueues.entrySet()) {
            RequestQueue requests;
            String requestId = requestQueueEntry.getKey();
            if (!(requestId.equals(RequestPriority.LOW.name()) || requestId.equals(RequestPriority.HIGH.name()) ? (requests = requestQueueEntry.getValue()).hasConfigId(configId) : requestId.equals(configId))) continue;
            return true;
        }
        return false;
    }

    protected abstract void triggerProcess();

    class RequestQueue
    implements ExpiringState {
        private Instant lastAccessTime;
        private final BlockingQueue<RequestType> content;

        RequestQueue() {
            this.lastAccessTime = RateLimitedRequestWorker.this.clock.instant();
            this.content = new LinkedBlockingQueue();
        }

        @Override
        public boolean expired(Duration stateTtl) {
            return this.expired(this.lastAccessTime, stateTtl, RateLimitedRequestWorker.this.clock.instant());
        }

        public void put(RequestType request) throws InterruptedException {
            this.content.put(request);
        }

        public int size() {
            return this.content.size();
        }

        public boolean isEmpty() {
            return this.content.size() == 0;
        }

        public int drain(int numberToRemove) {
            int removed;
            for (removed = 0; removed <= numberToRemove && this.content.poll() != null; ++removed) {
            }
            return removed;
        }

        public int drain(float removeRatio) {
            int numberToRemove = (int)((float)this.content.size() * removeRatio);
            return this.drain(numberToRemove);
        }

        public int clearExpiredRequests() {
            int removed = 0;
            QueuedRequest head = (QueuedRequest)this.content.peek();
            while (head != null && head.getExpirationEpochMs() < RateLimitedRequestWorker.this.clock.millis()) {
                this.content.poll();
                ++removed;
                head = (QueuedRequest)this.content.peek();
            }
            return removed;
        }

        public boolean hasConfigId(String configId) {
            for (QueuedRequest request : this.content) {
                if (!configId.equals(request.getConfigId())) continue;
                return true;
            }
            return false;
        }
    }
}

