/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.snapshots;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.threadpool.ThreadPool;

public class InternalSnapshotsInfoService
implements ClusterStateListener,
SnapshotsInfoService {
    public static final Setting<Integer> INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING = Setting.intSetting("cluster.snapshot.info.max_concurrent_fetches", 5, 1, Setting.Property.Dynamic, Setting.Property.NodeScope);
    private static final Logger logger = LogManager.getLogger(InternalSnapshotsInfoService.class);
    private static final ActionListener<ClusterState> REROUTE_LISTENER = ActionListener.wrap(r -> logger.trace("reroute after snapshot shard size update completed"), e -> logger.debug("reroute after snapshot shard size update failed", (Throwable)e));
    private final ThreadPool threadPool;
    private final Supplier<RepositoriesService> repositoriesService;
    private final Supplier<RerouteService> rerouteService;
    private volatile Map<SnapshotShard, Long> knownSnapshotShards;
    private volatile boolean isMaster;
    private final Set<SnapshotShard> unknownSnapshotShards;
    private final Queue<SnapshotShard> queue;
    private final Set<SnapshotShard> failedSnapshotShards;
    private volatile int maxConcurrentFetches;
    private int activeFetches;
    private final Object mutex;

    public InternalSnapshotsInfoService(Settings settings, ClusterService clusterService, Supplier<RepositoriesService> repositoriesServiceSupplier, Supplier<RerouteService> rerouteServiceSupplier) {
        this.threadPool = clusterService.getClusterApplierService().threadPool();
        this.repositoriesService = repositoriesServiceSupplier;
        this.rerouteService = rerouteServiceSupplier;
        this.knownSnapshotShards = Map.of();
        this.unknownSnapshotShards = new LinkedHashSet<SnapshotShard>();
        this.failedSnapshotShards = new LinkedHashSet<SnapshotShard>();
        this.queue = new LinkedList<SnapshotShard>();
        this.mutex = new Object();
        this.activeFetches = 0;
        this.maxConcurrentFetches = INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.get(settings);
        ClusterSettings clusterSettings = clusterService.getClusterSettings();
        clusterSettings.addSettingsUpdateConsumer(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING, this::setMaxConcurrentFetches);
        if (DiscoveryNode.isClusterManagerNode(settings)) {
            clusterService.addListener(this);
        }
    }

    private void setMaxConcurrentFetches(Integer maxConcurrentFetches) {
        this.maxConcurrentFetches = maxConcurrentFetches;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SnapshotShardSizeInfo snapshotShardSizes() {
        Object object = this.mutex;
        synchronized (object) {
            HashMap<SnapshotShard, Long> snapshotShardSizes = new HashMap<SnapshotShard, Long>(this.knownSnapshotShards);
            if (!this.failedSnapshotShards.isEmpty()) {
                for (SnapshotShard snapshotShard : this.failedSnapshotShards) {
                    Long previous = snapshotShardSizes.put(snapshotShard, -1L);
                    assert (previous == null) : "snapshot shard size already known for " + snapshotShard;
                }
            }
            return new SnapshotShardSizeInfo(snapshotShardSizes);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clusterChanged(ClusterChangedEvent event) {
        if (event.localNodeClusterManager()) {
            Set<SnapshotShard> onGoingSnapshotRecoveries = InternalSnapshotsInfoService.listOfSnapshotShards(event.state());
            int unknownShards = 0;
            Object object = this.mutex;
            synchronized (object) {
                this.isMaster = true;
                for (SnapshotShard snapshotShard : onGoingSnapshotRecoveries) {
                    if (this.knownSnapshotShards.containsKey(snapshotShard) || this.failedSnapshotShards.contains(snapshotShard) || !this.unknownSnapshotShards.add(snapshotShard)) continue;
                    this.queue.add(snapshotShard);
                    ++unknownShards;
                }
                this.cleanUpSnapshotShardSizes(onGoingSnapshotRecoveries);
            }
            int nbFetchers = Math.min(unknownShards, this.maxConcurrentFetches);
            for (int i = 0; i < nbFetchers; ++i) {
                this.fetchNextSnapshotShard();
            }
        } else if (event.previousState().nodes().isLocalNodeElectedClusterManager()) {
            Object object = this.mutex;
            synchronized (object) {
                SnapshotShard snapshotShard;
                this.knownSnapshotShards = Map.of();
                this.failedSnapshotShards.clear();
                this.isMaster = false;
                while ((snapshotShard = this.queue.poll()) != null) {
                    boolean removed = this.unknownSnapshotShards.remove(snapshotShard);
                    assert (removed) : "snapshot shard to remove does not exist " + snapshotShard;
                }
                assert (this.invariant());
            }
        } else {
            Object object = this.mutex;
            synchronized (object) {
                assert (this.unknownSnapshotShards.isEmpty() || this.unknownSnapshotShards.size() == this.activeFetches);
                assert (this.knownSnapshotShards.isEmpty());
                assert (this.failedSnapshotShards.isEmpty());
                assert (!this.isMaster);
                assert (this.queue.isEmpty());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchNextSnapshotShard() {
        Object object = this.mutex;
        synchronized (object) {
            SnapshotShard snapshotShard;
            if (this.activeFetches < this.maxConcurrentFetches && (snapshotShard = this.queue.poll()) != null) {
                ++this.activeFetches;
                this.threadPool.generic().execute(new FetchingSnapshotShardSizeRunnable(snapshotShard));
            }
            assert (this.invariant());
        }
    }

    private void cleanUpSnapshotShardSizes(Set<SnapshotShard> requiredSnapshotShards) {
        assert (Thread.holdsLock(this.mutex));
        HashMap<SnapshotShard, Long> newSnapshotShardSizes = null;
        for (SnapshotShard shard : this.knownSnapshotShards.keySet()) {
            if (requiredSnapshotShards.contains(shard)) continue;
            if (newSnapshotShardSizes == null) {
                newSnapshotShardSizes = new HashMap<SnapshotShard, Long>(this.knownSnapshotShards);
            }
            newSnapshotShardSizes.remove(shard);
        }
        if (newSnapshotShardSizes != null) {
            this.knownSnapshotShards = Collections.unmodifiableMap(newSnapshotShardSizes);
        }
        this.failedSnapshotShards.retainAll(requiredSnapshotShards);
    }

    private boolean invariant() {
        assert (Thread.holdsLock(this.mutex));
        assert (this.activeFetches >= 0) : "active fetches should be greater than or equal to zero but got: " + this.activeFetches;
        assert (this.activeFetches <= this.maxConcurrentFetches) : this.activeFetches + " <= " + this.maxConcurrentFetches;
        for (SnapshotShard cursor : this.knownSnapshotShards.keySet()) {
            assert (!this.unknownSnapshotShards.contains(cursor)) : "cannot be known and unknown at same time: " + cursor;
            assert (!this.failedSnapshotShards.contains(cursor)) : "cannot be known and failed at same time: " + cursor;
        }
        for (SnapshotShard shard : this.unknownSnapshotShards) {
            assert (!this.knownSnapshotShards.keySet().contains(shard)) : "cannot be unknown and known at same time: " + shard;
            assert (!this.failedSnapshotShards.contains(shard)) : "cannot be unknown and failed at same time: " + shard;
        }
        for (SnapshotShard shard : this.failedSnapshotShards) {
            assert (!this.knownSnapshotShards.keySet().contains(shard)) : "cannot be failed and known at same time: " + shard;
            assert (!this.unknownSnapshotShards.contains(shard)) : "cannot be failed and unknown at same time: " + shard;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int numberOfUnknownSnapshotShardSizes() {
        Object object = this.mutex;
        synchronized (object) {
            return this.unknownSnapshotShards.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int numberOfFailedSnapshotShardSizes() {
        Object object = this.mutex;
        synchronized (object) {
            return this.failedSnapshotShards.size();
        }
    }

    int numberOfKnownSnapshotShardSizes() {
        return this.knownSnapshotShards.size();
    }

    private static Set<SnapshotShard> listOfSnapshotShards(ClusterState state) {
        HashSet<SnapshotShard> snapshotShards = new HashSet<SnapshotShard>();
        for (ShardRouting shardRouting : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
            if (!shardRouting.primary() || shardRouting.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) continue;
            RecoverySource.SnapshotRecoverySource snapshotRecoverySource = (RecoverySource.SnapshotRecoverySource)shardRouting.recoverySource();
            SnapshotShard snapshotShard = new SnapshotShard(snapshotRecoverySource.snapshot(), snapshotRecoverySource.index(), shardRouting.shardId(), snapshotRecoverySource.pinnedTimestamp());
            snapshotShards.add(snapshotShard);
        }
        return Collections.unmodifiableSet(snapshotShards);
    }

    @PublicApi(since="1.0.0")
    public static class SnapshotShard {
        private final Snapshot snapshot;
        private final IndexId index;
        private final ShardId shardId;
        private long pinnedTimestamp;

        public SnapshotShard(Snapshot snapshot, IndexId index, ShardId shardId) {
            this(snapshot, index, shardId, 0L);
        }

        public SnapshotShard(Snapshot snapshot, IndexId index, ShardId shardId, long pinnedTimestamp) {
            this.snapshot = snapshot;
            this.index = index;
            this.shardId = shardId;
            this.pinnedTimestamp = pinnedTimestamp;
        }

        public Snapshot snapshot() {
            return this.snapshot;
        }

        public IndexId index() {
            return this.index;
        }

        public ShardId shardId() {
            return this.shardId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SnapshotShard that = (SnapshotShard)o;
            return this.shardId.equals((Object)that.shardId) && this.snapshot.equals(that.snapshot) && this.index.equals(that.index);
        }

        public int hashCode() {
            return Objects.hash(this.snapshot, this.index, this.shardId);
        }

        public String toString() {
            return "[snapshot=" + this.snapshot + ", index=" + this.index + ", shard=" + this.shardId + "]";
        }
    }

    private class FetchingSnapshotShardSizeRunnable
    extends AbstractRunnable {
        private final SnapshotShard snapshotShard;
        private boolean removed;

        FetchingSnapshotShardSizeRunnable(SnapshotShard snapshotShard) {
            this.snapshotShard = snapshotShard;
            this.removed = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void doRun() throws Exception {
            RepositoriesService repositories = InternalSnapshotsInfoService.this.repositoriesService.get();
            assert (repositories != null);
            Repository repository = repositories.repository(this.snapshotShard.snapshot.getRepository());
            logger.debug("fetching snapshot shard size for {}", (Object)this.snapshotShard);
            long snapshotShardSize = this.snapshotShard.pinnedTimestamp > 0L ? 0L : repository.getShardSnapshotStatus(this.snapshotShard.snapshot().getSnapshotId(), this.snapshotShard.index(), this.snapshotShard.shardId()).asCopy().getTotalSize();
            logger.debug("snapshot shard size for {}: {} bytes", (Object)this.snapshotShard, (Object)snapshotShardSize);
            boolean updated = false;
            Object object = InternalSnapshotsInfoService.this.mutex;
            synchronized (object) {
                this.removed = InternalSnapshotsInfoService.this.unknownSnapshotShards.remove(this.snapshotShard);
                assert (this.removed) : "snapshot shard to remove does not exist " + snapshotShardSize;
                if (InternalSnapshotsInfoService.this.isMaster) {
                    HashMap<SnapshotShard, Long> newSnapshotShardSizes = new HashMap<SnapshotShard, Long>(InternalSnapshotsInfoService.this.knownSnapshotShards);
                    boolean bl = updated = newSnapshotShardSizes.put(this.snapshotShard, snapshotShardSize) == null;
                    assert (updated) : "snapshot shard size already exists for " + this.snapshotShard;
                    InternalSnapshotsInfoService.this.knownSnapshotShards = Collections.unmodifiableMap(newSnapshotShardSizes);
                }
                --InternalSnapshotsInfoService.this.activeFetches;
                assert (InternalSnapshotsInfoService.this.invariant());
            }
            if (updated) {
                InternalSnapshotsInfoService.this.rerouteService.get().reroute("snapshot shard size updated", Priority.HIGH, REROUTE_LISTENER);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onFailure(Exception e) {
            logger.warn(() -> new ParameterizedMessage("failed to retrieve shard size for {}", (Object)this.snapshotShard), (Throwable)e);
            boolean failed = false;
            Object object = InternalSnapshotsInfoService.this.mutex;
            synchronized (object) {
                if (InternalSnapshotsInfoService.this.isMaster) {
                    failed = InternalSnapshotsInfoService.this.failedSnapshotShards.add(this.snapshotShard);
                    assert (failed) : "snapshot shard size already failed for " + this.snapshotShard;
                }
                if (!this.removed) {
                    InternalSnapshotsInfoService.this.unknownSnapshotShards.remove(this.snapshotShard);
                }
                --InternalSnapshotsInfoService.this.activeFetches;
                assert (InternalSnapshotsInfoService.this.invariant());
            }
            if (failed) {
                InternalSnapshotsInfoService.this.rerouteService.get().reroute("snapshot shard size failed", Priority.HIGH, REROUTE_LISTENER);
            }
        }

        @Override
        public void onAfter() {
            InternalSnapshotsInfoService.this.fetchNextSnapshotShard();
        }
    }
}

