/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.share;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import kafka.server.FetchSession;
import kafka.server.QuotaFactory$UnboundedQuota$;
import kafka.server.ReplicaManager;
import kafka.server.share.FinalContext;
import kafka.server.share.ShareFetchContext;
import kafka.server.share.SharePartition;
import kafka.server.share.ShareSessionContext;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ShareFetchMetadata;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.ShareSession;
import org.apache.kafka.server.share.ShareSessionCache;
import org.apache.kafka.server.share.ShareSessionKey;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import scala.runtime.BoxedUnit;

public class SharePartitionManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SharePartitionManager.class);
    private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
    private final ReplicaManager replicaManager;
    private final Time time;
    private final ShareSessionCache cache;
    private final ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue;
    private final AtomicBoolean processFetchQueueLock;
    private final int recordLockDurationMs;
    private final Timer timer;
    private final int maxInFlightMessages;
    private final int maxDeliveryCount;
    private final Persister persister;
    private final ShareGroupMetrics shareGroupMetrics;

    public SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache cache, int recordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, Persister persister, Metrics metrics) {
        this(replicaManager, time, cache, new ConcurrentHashMap<SharePartitionKey, SharePartition>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages, persister, metrics);
    }

    private SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache cache, Map<SharePartitionKey, SharePartition> partitionCacheMap, int recordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, Persister persister, Metrics metrics) {
        this.replicaManager = replicaManager;
        this.time = time;
        this.cache = cache;
        this.partitionCacheMap = partitionCacheMap;
        this.fetchQueue = new ConcurrentLinkedQueue();
        this.processFetchQueueLock = new AtomicBoolean(false);
        this.recordLockDurationMs = recordLockDurationMs;
        this.timer = new SystemTimerReaper("share-group-lock-timeout-reaper", (Timer)new SystemTimer("share-group-lock-timeout"));
        this.maxDeliveryCount = maxDeliveryCount;
        this.maxInFlightMessages = maxInFlightMessages;
        this.persister = persister;
        this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
    }

    SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache cache, Map<SharePartitionKey, SharePartition> partitionCacheMap, ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue, int recordLockDurationMs, Timer timer, int maxDeliveryCount, int maxInFlightMessages, Persister persister, Metrics metrics) {
        this.replicaManager = replicaManager;
        this.time = time;
        this.cache = cache;
        this.partitionCacheMap = partitionCacheMap;
        this.fetchQueue = fetchQueue;
        this.processFetchQueueLock = new AtomicBoolean(false);
        this.recordLockDurationMs = recordLockDurationMs;
        this.timer = timer;
        this.maxDeliveryCount = maxDeliveryCount;
        this.maxInFlightMessages = maxInFlightMessages;
        this.persister = persister;
        this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
    }

    public CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages(String groupId, String memberId, FetchParams fetchParams, List<TopicIdPartition> topicIdPartitions, Map<TopicIdPartition, Integer> partitionMaxBytes) {
        log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}", new Object[]{topicIdPartitions, groupId, fetchParams});
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>();
        ShareFetchPartitionData shareFetchPartitionData = new ShareFetchPartitionData(fetchParams, groupId, memberId, topicIdPartitions, future, partitionMaxBytes);
        this.fetchQueue.add(shareFetchPartitionData);
        this.maybeProcessFetchQueue();
        return future;
    }

    public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> acknowledge(String memberId, String groupId, Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics) {
        log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", acknowledgeTopics.keySet(), (Object)groupId);
        this.shareGroupMetrics.shareAcknowledgement();
        HashMap futures = new HashMap();
        acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
            SharePartition sharePartition = this.partitionCacheMap.get(this.sharePartitionKey(groupId, (TopicIdPartition)topicIdPartition));
            if (sharePartition != null) {
                CompletionStage future = sharePartition.acknowledge(memberId, (List<ShareAcknowledgementBatch>)acknowledgePartitionBatches).thenApply(throwable -> {
                    if (throwable.isPresent()) {
                        return Errors.forException((Throwable)((Throwable)throwable.get()));
                    }
                    acknowledgePartitionBatches.forEach(batch -> batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement));
                    return Errors.NONE;
                });
                futures.put(topicIdPartition, future);
            } else {
                futures.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
            }
        });
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));
        return allFutures.thenApply(v -> {
            HashMap result = new HashMap();
            futures.forEach((topicIdPartition, future) -> result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()).setErrorCode(((Errors)future.join()).code())));
            return result;
        });
    }

    public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> releaseAcquiredRecords(String groupId, String memberId) {
        log.trace("Release acquired records request for groupId: {}, memberId: {}", (Object)groupId, (Object)memberId);
        List<TopicIdPartition> topicIdPartitions = this.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString((String)memberId));
        if (topicIdPartitions.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        HashMap futuresMap = new HashMap();
        topicIdPartitions.forEach(topicIdPartition -> {
            SharePartition sharePartition = this.partitionCacheMap.get(this.sharePartitionKey(groupId, (TopicIdPartition)topicIdPartition));
            if (sharePartition == null) {
                log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", (Object)groupId, topicIdPartition);
                futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
            } else {
                CompletionStage future = sharePartition.releaseAcquiredRecords(memberId).thenApply(throwable -> {
                    if (throwable.isPresent()) {
                        return Errors.forException((Throwable)((Throwable)throwable.get()));
                    }
                    return Errors.NONE;
                });
                futuresMap.put(topicIdPartition, future);
            }
        });
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futuresMap.values().toArray(new CompletableFuture[futuresMap.size()]));
        return allFutures.thenApply(v -> {
            HashMap result = new HashMap();
            futuresMap.forEach((topicIdPartition, future) -> result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()).setErrorCode(((Errors)future.join()).code())));
            return result;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData, List<TopicIdPartition> toForget, ShareFetchMetadata reqMetadata) {
        ShareFetchContext context;
        HashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData>();
        shareFetchData.forEach((tp, sharePartitionData) -> {
            if (sharePartitionData.maxBytes > 0) {
                shareFetchDataWithMaxBytes.put((TopicIdPartition)tp, (ShareFetchRequest.SharePartitionData)sharePartitionData);
            }
        });
        if (reqMetadata.isFull()) {
            ShareSessionKey key = this.shareSessionKey(groupId, reqMetadata.memberId());
            if (reqMetadata.epoch() == -1) {
                if (!shareFetchDataWithMaxBytes.isEmpty()) {
                    throw Errors.INVALID_REQUEST.exception();
                }
                context = new FinalContext();
                if (this.cache.remove(key) != null) {
                    log.debug("Removed share session with key {}", (Object)key);
                }
            } else {
                if (this.cache.remove(key) != null) {
                    log.debug("Removed share session with key {}", (Object)key);
                }
                ImplicitLinkedHashCollection cachedSharePartitions = new ImplicitLinkedHashCollection(shareFetchDataWithMaxBytes.size());
                shareFetchDataWithMaxBytes.forEach((topicIdPartition, reqData) -> cachedSharePartitions.mustAdd((ImplicitLinkedHashCollection.Element)new CachedSharePartition(topicIdPartition, reqData, false)));
                ShareSessionKey responseShareSessionKey = this.cache.maybeCreateSession(groupId, reqMetadata.memberId(), this.time.milliseconds(), cachedSharePartitions);
                if (responseShareSessionKey == null) {
                    log.error("Could not create a share session for group {} member {}", (Object)groupId, (Object)reqMetadata.memberId());
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                context = new ShareSessionContext(reqMetadata, shareFetchDataWithMaxBytes);
                log.debug("Created a new ShareSessionContext with key {} isSubsequent {} returning {}. A new share session will be started.", new Object[]{responseShareSessionKey, false, SharePartitionManager.partitionsToLogString(shareFetchDataWithMaxBytes.keySet())});
            }
        } else {
            ShareSessionCache shareSessionCache = this.cache;
            synchronized (shareSessionCache) {
                ShareSessionKey key = this.shareSessionKey(groupId, reqMetadata.memberId());
                ShareSession shareSession = this.cache.get(key);
                if (shareSession == null) {
                    log.error("Share session error for {}: no such share session found", (Object)key);
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                if (shareSession.epoch != reqMetadata.epoch()) {
                    log.debug("Share session error for {}: expected epoch {}, but got {} instead", new Object[]{key, shareSession.epoch, reqMetadata.epoch()});
                    throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
                }
                Map modifiedTopicIdPartitions = shareSession.update(shareFetchDataWithMaxBytes, toForget);
                this.cache.touch(shareSession, this.time.milliseconds());
                shareSession.epoch = ShareFetchMetadata.nextEpoch((int)shareSession.epoch);
                log.debug("Created a new ShareSessionContext for session key {}, epoch {}: added {}, updated {}, removed {}", new Object[]{shareSession.key(), shareSession.epoch, SharePartitionManager.partitionsToLogString((Collection)modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.ADDED)), SharePartitionManager.partitionsToLogString((Collection)modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.UPDATED)), SharePartitionManager.partitionsToLogString((Collection)modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.REMOVED))});
                context = new ShareSessionContext(reqMetadata, shareSession);
            }
        }
        return context;
    }

    List<TopicIdPartition> cachedTopicIdPartitionsInShareSession(String groupId, Uuid memberId) {
        ShareSessionKey key = this.shareSessionKey(groupId, memberId);
        ShareSession shareSession = this.cache.get(key);
        if (shareSession == null) {
            return Collections.emptyList();
        }
        ArrayList<TopicIdPartition> cachedTopicIdPartitions = new ArrayList<TopicIdPartition>();
        shareSession.partitionMap().forEach(cachedSharePartition -> cachedTopicIdPartitions.add(new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()))));
        return cachedTopicIdPartitions;
    }

    @Override
    public void close() throws Exception {
        this.timer.close();
        this.persister.stop();
    }

    private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
        return new ShareSessionKey(groupId, memberId);
    }

    private static String partitionsToLogString(Collection<TopicIdPartition> partitions) {
        return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled());
    }

    void maybeProcessFetchQueue() {
        if (!this.processFetchQueueLock.compareAndSet(false, true)) {
            return;
        }
        LinkedHashMap topicPartitionData = new LinkedHashMap();
        ShareFetchPartitionData shareFetchPartitionData = this.fetchQueue.poll();
        if (shareFetchPartitionData == null) {
            this.releaseProcessFetchQueueLock();
            return;
        }
        try {
            shareFetchPartitionData.topicIdPartitions.forEach(topicIdPartition -> {
                SharePartitionKey sharePartitionKey = this.sharePartitionKey(shareFetchPartitionData.groupId, (TopicIdPartition)topicIdPartition);
                SharePartition sharePartition = this.partitionCacheMap.computeIfAbsent(sharePartitionKey, k -> {
                    long start = this.time.hiResClockMs();
                    SharePartition partition = new SharePartition(shareFetchPartitionData.groupId, (TopicIdPartition)topicIdPartition, this.maxInFlightMessages, this.maxDeliveryCount, this.recordLockDurationMs, this.timer, this.time, this.persister);
                    this.shareGroupMetrics.partitionLoadTime(start);
                    return partition;
                });
                int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0);
                if (sharePartition.maybeAcquireFetchLock()) {
                    if (sharePartition.canAcquireRecords()) {
                        topicPartitionData.put(topicIdPartition, new FetchRequest.PartitionData(topicIdPartition.topicId(), sharePartition.nextFetchOffset(), 0L, partitionMaxBytes, Optional.empty()));
                    } else {
                        sharePartition.releaseFetchLock();
                        log.info("Record lock partition limit exceeded for SharePartition with key {}, cannot acquire more records", (Object)sharePartitionKey);
                    }
                }
            });
            if (topicPartitionData.isEmpty()) {
                shareFetchPartitionData.future.complete(Collections.emptyMap());
                this.releaseProcessFetchQueueLock();
                if (!this.fetchQueue.isEmpty()) {
                    this.maybeProcessFetchQueue();
                }
                return;
            }
            log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", new Object[]{topicPartitionData, shareFetchPartitionData.groupId, shareFetchPartitionData.fetchParams});
            this.replicaManager.fetchMessages(shareFetchPartitionData.fetchParams, (Seq<Tuple2<TopicIdPartition, FetchRequest.PartitionData>>)CollectionConverters.asScala(topicPartitionData.entrySet().stream().map(entry -> new Tuple2((Object)((TopicIdPartition)entry.getKey()), (Object)((FetchRequest.PartitionData)entry.getValue()))).collect(Collectors.toList())), QuotaFactory$UnboundedQuota$.MODULE$, (Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>)((Function1)responsePartitionData -> {
                log.trace("Data successfully retrieved by replica manager: {}", responsePartitionData);
                List responseData = CollectionConverters.asJava((Seq)responsePartitionData);
                this.processFetchResponse(shareFetchPartitionData, responseData).whenComplete((result, throwable) -> {
                    if (throwable != null) {
                        log.error("Error processing fetch response for share partitions", throwable);
                        shareFetchPartitionData.future.completeExceptionally((Throwable)throwable);
                    } else {
                        shareFetchPartitionData.future.complete(result);
                    }
                    this.releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, topicPartitionData.keySet());
                });
                return BoxedUnit.UNIT;
            }));
            if (!this.fetchQueue.isEmpty()) {
                this.maybeProcessFetchQueue();
            }
        }
        catch (Exception e) {
            log.error("Error processing fetch queue for share partitions", (Throwable)e);
            this.releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, topicPartitionData.keySet());
        }
    }

    CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse(ShareFetchPartitionData shareFetchPartitionData, List<Tuple2<TopicIdPartition, FetchPartitionData>> responseData) {
        HashMap futures = new HashMap();
        responseData.forEach(data -> {
            TopicIdPartition topicIdPartition = (TopicIdPartition)data._1;
            FetchPartitionData fetchPartitionData = (FetchPartitionData)data._2;
            SharePartition sharePartition = this.partitionCacheMap.get(this.sharePartitionKey(shareFetchPartitionData.groupId, topicIdPartition));
            futures.put(topicIdPartition, sharePartition.acquire(shareFetchPartitionData.memberId, fetchPartitionData).handle((acquiredRecords, throwable) -> {
                log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", new Object[]{topicIdPartition, shareFetchPartitionData, acquiredRecords});
                ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition());
                if (throwable != null) {
                    partitionData.setErrorCode(Errors.forException((Throwable)throwable).code());
                    return partitionData;
                }
                if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) {
                    sharePartition.updateCacheAndOffsets(this.offsetForEarliestTimestamp(topicIdPartition));
                    partitionData.setPartitionIndex(topicIdPartition.partition()).setRecords(null).setErrorCode(Errors.NONE.code()).setAcquiredRecords(Collections.emptyList()).setAcknowledgeErrorCode(Errors.NONE.code());
                    return partitionData;
                }
                partitionData.setPartitionIndex(topicIdPartition.partition()).setRecords((BaseRecords)fetchPartitionData.records).setErrorCode(fetchPartitionData.error.code()).setAcquiredRecords(acquiredRecords).setAcknowledgeErrorCode(Errors.NONE.code());
                return partitionData;
            }));
        });
        return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).thenApply(v -> {
            HashMap processedResult = new HashMap();
            futures.forEach((topicIdPartition, future) -> processedResult.put(topicIdPartition, (ShareFetchResponseData.PartitionData)future.join()));
            return processedResult;
        });
    }

    void releaseFetchQueueAndPartitionsLock(String groupId, Set<TopicIdPartition> topicIdPartitions) {
        topicIdPartitions.forEach(tp -> this.partitionCacheMap.get(this.sharePartitionKey(groupId, (TopicIdPartition)tp)).releaseFetchLock());
        this.releaseProcessFetchQueueLock();
    }

    private void releaseProcessFetchQueueLock() {
        this.processFetchQueueLock.set(false);
    }

    private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
        return new SharePartitionKey(groupId, topicIdPartition);
    }

    long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition) {
        Option<FileRecords.TimestampAndOffset> timestampAndOffset = this.replicaManager.fetchOffsetForTimestamp(topicIdPartition.topicPartition(), -2L, (Option<IsolationLevel>)Option.empty(), Optional.empty(), true);
        return timestampAndOffset.isEmpty() ? 0L : ((FileRecords.TimestampAndOffset)timestampAndOffset.get()).offset;
    }

    static class ShareGroupMetrics {
        public static final String METRICS_GROUP_NAME = "share-group-metrics";
        public static final String SHARE_ACK_SENSOR = "share-acknowledgement-sensor";
        public static final String SHARE_ACK_RATE = "share-acknowledgement-rate";
        public static final String SHARE_ACK_COUNT = "share-acknowledgement-count";
        public static final String RECORD_ACK_SENSOR_PREFIX = "record-acknowledgement";
        public static final String RECORD_ACK_RATE = "record-acknowledgement-rate";
        public static final String RECORD_ACK_COUNT = "record-acknowledgement-count";
        public static final String ACK_TYPE = "ack-type";
        public static final String PARTITION_LOAD_TIME_SENSOR = "partition-load-time-sensor";
        public static final String PARTITION_LOAD_TIME_AVG = "partition-load-time-avg";
        public static final String PARTITION_LOAD_TIME_MAX = "partition-load-time-max";
        public static final Map<Byte, String> RECORD_ACKS_MAP = new HashMap<Byte, String>();
        private final Time time;
        private final Sensor shareAcknowledgementSensor;
        private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<Byte, Sensor>();
        private final Sensor partitionLoadTimeSensor;

        public ShareGroupMetrics(Metrics metrics, Time time) {
            this.time = time;
            this.shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
            this.shareAcknowledgementSensor.add((CompoundStat)new Meter(metrics.metricName(SHARE_ACK_RATE, METRICS_GROUP_NAME, "Rate of acknowledge requests."), metrics.metricName(SHARE_ACK_COUNT, METRICS_GROUP_NAME, "The number of acknowledge requests.")));
            for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
                this.recordAcksSensorMap.put(entry.getKey(), metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, entry.getValue())));
                this.recordAcksSensorMap.get(entry.getKey()).add((CompoundStat)new Meter(metrics.metricName(RECORD_ACK_RATE, METRICS_GROUP_NAME, "Rate of records acknowledged per acknowledgement type.", new String[]{ACK_TYPE, entry.getValue()}), metrics.metricName(RECORD_ACK_COUNT, METRICS_GROUP_NAME, "The number of records acknowledged per acknowledgement type.", new String[]{ACK_TYPE, entry.getValue()})));
            }
            this.partitionLoadTimeSensor = metrics.sensor(PARTITION_LOAD_TIME_SENSOR);
            this.partitionLoadTimeSensor.add(metrics.metricName(PARTITION_LOAD_TIME_AVG, METRICS_GROUP_NAME, "The average time in milliseconds to load the share partitions."), (MeasurableStat)new Avg());
            this.partitionLoadTimeSensor.add(metrics.metricName(PARTITION_LOAD_TIME_MAX, METRICS_GROUP_NAME, "The maximum time in milliseconds to load the share partitions."), (MeasurableStat)new Max());
        }

        void shareAcknowledgement() {
            this.shareAcknowledgementSensor.record();
        }

        void recordAcknowledgement(byte ackType) {
            if (this.recordAcksSensorMap.containsKey(ackType)) {
                this.recordAcksSensorMap.get(ackType).record();
            }
        }

        void partitionLoadTime(long start) {
            this.partitionLoadTimeSensor.record((double)(this.time.hiResClockMs() - start));
        }

        static {
            RECORD_ACKS_MAP.put((byte)1, AcknowledgeType.ACCEPT.toString());
            RECORD_ACKS_MAP.put((byte)2, AcknowledgeType.RELEASE.toString());
            RECORD_ACKS_MAP.put((byte)3, AcknowledgeType.REJECT.toString());
        }
    }

    static class ShareFetchPartitionData {
        private final FetchParams fetchParams;
        private final String groupId;
        private final String memberId;
        private final List<TopicIdPartition> topicIdPartitions;
        private final CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future;
        private final Map<TopicIdPartition, Integer> partitionMaxBytes;

        public ShareFetchPartitionData(FetchParams fetchParams, String groupId, String memberId, List<TopicIdPartition> topicIdPartitions, CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future, Map<TopicIdPartition, Integer> partitionMaxBytes) {
            this.fetchParams = fetchParams;
            this.groupId = groupId;
            this.memberId = memberId;
            this.topicIdPartitions = topicIdPartitions;
            this.future = future;
            this.partitionMaxBytes = partitionMaxBytes;
        }
    }

    static class SharePartitionKey {
        private final String groupId;
        private final TopicIdPartition topicIdPartition;

        public SharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
            this.groupId = Objects.requireNonNull(groupId);
            this.topicIdPartition = Objects.requireNonNull(topicIdPartition);
        }

        public int hashCode() {
            return Objects.hash(this.groupId, this.topicIdPartition);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || this.getClass() != obj.getClass()) {
                return false;
            }
            SharePartitionKey that = (SharePartitionKey)obj;
            return this.groupId.equals(that.groupId) && Objects.equals(this.topicIdPartition, that.topicIdPartition);
        }

        public String toString() {
            return "SharePartitionKey{groupId='" + this.groupId + ", topicIdPartition=" + this.topicIdPartition + '}';
        }
    }
}

