/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.utils.LogContext;

public class MockConsumer<K, V>
implements Consumer<K, V> {
    private final Map<String, List<PartitionInfo>> partitions;
    private final SubscriptionState subscriptions;
    private final Map<TopicPartition, Long> beginningOffsets;
    private final Map<TopicPartition, Long> endOffsets;
    private final Map<TopicPartition, Long> durationResetOffsets;
    private final Map<TopicPartition, OffsetAndMetadata> committed;
    private final Queue<Runnable> pollTasks;
    private final Set<TopicPartition> paused;
    private final AtomicBoolean wakeup;
    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
    private KafkaException pollException;
    private KafkaException offsetsException;
    private Duration lastPollTimeout;
    private boolean closed;
    private boolean shouldRebalance;
    private boolean telemetryDisabled = false;
    private Uuid clientInstanceId;
    private int injectTimeoutExceptionCounter;
    private final List<KafkaMetric> addedMetrics = new ArrayList<KafkaMetric>();

    @Deprecated
    public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
        this(AutoOffsetResetStrategy.fromString(offsetResetStrategy.toString()));
    }

    public MockConsumer(String offsetResetStrategy) {
        this(AutoOffsetResetStrategy.fromString(offsetResetStrategy));
    }

    private MockConsumer(AutoOffsetResetStrategy offsetResetStrategy) {
        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
        this.partitions = new HashMap<String, List<PartitionInfo>>();
        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
        this.paused = new HashSet<TopicPartition>();
        this.closed = false;
        this.beginningOffsets = new HashMap<TopicPartition, Long>();
        this.endOffsets = new HashMap<TopicPartition, Long>();
        this.durationResetOffsets = new HashMap<TopicPartition, Long>();
        this.pollTasks = new LinkedList<Runnable>();
        this.pollException = null;
        this.wakeup = new AtomicBoolean(false);
        this.committed = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.shouldRebalance = false;
    }

    @Override
    public synchronized Set<TopicPartition> assignment() {
        return this.subscriptions.assignedPartitions();
    }

    public synchronized void rebalance(Collection<TopicPartition> newAssignment) {
        Set<TopicPartition> oldAssignmentSet = this.subscriptions.assignedPartitions();
        HashSet<TopicPartition> newAssignmentSet = new HashSet<TopicPartition>(newAssignment);
        List added = newAssignment.stream().filter(x -> !oldAssignmentSet.contains(x)).collect(Collectors.toList());
        List removed = oldAssignmentSet.stream().filter(x -> !newAssignmentSet.contains(x)).collect(Collectors.toList());
        this.records.clear();
        if (!removed.isEmpty()) {
            this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsRevoked(removed));
        }
        this.subscriptions.assignFromSubscribed(newAssignment);
        this.subscriptions.rebalanceListener().ifPresent(crl -> crl.onPartitionsAssigned(added));
    }

    @Override
    public synchronized Set<String> subscription() {
        return this.subscriptions.subscription();
    }

    @Override
    public synchronized void subscribe(Collection<String> topics) {
        this.subscribe(topics, Optional.empty());
    }

    @Override
    public synchronized void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        this.subscribe(pattern, Optional.of(listener));
    }

    @Override
    public synchronized void subscribe(Pattern pattern) {
        this.subscribe(pattern, Optional.empty());
    }

    @Override
    public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        this.subscribe(pattern, Optional.of(listener));
    }

    @Override
    public void subscribe(SubscriptionPattern pattern) {
        this.subscribe(pattern, Optional.empty());
    }

    private void subscribe(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) {
        if (pattern == null || pattern.toString().isEmpty()) {
            throw new IllegalArgumentException("Topic pattern cannot be " + (pattern == null ? "null" : "empty"));
        }
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.subscribe(pattern, listener);
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        this.subscribe(topics, Optional.of(listener));
    }

    private synchronized void subscribe(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.subscribe(new HashSet<String>(topics), listener);
    }

    private synchronized void subscribe(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.subscribe(pattern, listener);
        HashSet<String> topicsToSubscribe = new HashSet<String>();
        for (String topic : this.partitions.keySet()) {
            if (!pattern.matcher(topic).matches() || this.subscriptions.subscription().contains(topic)) continue;
            topicsToSubscribe.add(topic);
        }
        this.ensureNotClosed();
        this.subscriptions.subscribeFromPattern(topicsToSubscribe);
        HashSet<TopicPartition> assignedPartitions = new HashSet<TopicPartition>();
        for (String topic : topicsToSubscribe) {
            for (PartitionInfo info : this.partitions.get(topic)) {
                assignedPartitions.add(new TopicPartition(topic, info.partition()));
            }
        }
        this.subscriptions.assignFromSubscribed(assignedPartitions);
    }

    @Override
    public void registerMetricForSubscription(KafkaMetric metric) {
        this.addedMetrics.add(metric);
    }

    @Override
    public void unregisterMetricFromSubscription(KafkaMetric metric) {
        this.addedMetrics.remove(metric);
    }

    @Override
    public synchronized void assign(Collection<TopicPartition> partitions) {
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.assignFromUser(new HashSet<TopicPartition>(partitions));
    }

    @Override
    public synchronized void unsubscribe() {
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.unsubscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized ConsumerRecords<K, V> poll(Duration timeout) {
        this.ensureNotClosed();
        this.lastPollTimeout = timeout;
        Queue<Runnable> queue = this.pollTasks;
        synchronized (queue) {
            Runnable task = this.pollTasks.poll();
            if (task != null) {
                task.run();
            }
        }
        if (this.wakeup.get()) {
            this.wakeup.set(false);
            throw new WakeupException();
        }
        if (this.pollException != null) {
            KafkaException exception = this.pollException;
            this.pollException = null;
            throw exception;
        }
        for (TopicPartition tp : this.subscriptions.assignedPartitions()) {
            if (this.subscriptions.hasValidPosition(tp)) continue;
            this.updateFetchPosition(tp);
        }
        HashMap<TopicPartition, List<ConsumerRecord<TopicPartition, List>>> results = new HashMap<TopicPartition, List<ConsumerRecord<TopicPartition, List>>>();
        HashMap<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<TopicPartition, OffsetAndMetadata>();
        ArrayList<TopicPartition> toClear = new ArrayList<TopicPartition>();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
            if (this.subscriptions.isPaused(entry.getKey())) continue;
            List<ConsumerRecord<K, V>> recs = entry.getValue();
            for (ConsumerRecord<K, V> rec : recs) {
                long position = this.subscriptions.position((TopicPartition)entry.getKey()).offset;
                if (this.beginningOffsets.get(entry.getKey()) != null && this.beginningOffsets.get(entry.getKey()) > position) {
                    throw new OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position));
                }
                if (!this.assignment().contains(entry.getKey()) || rec.offset() < position) continue;
                results.computeIfAbsent(entry.getKey(), partition -> new ArrayList()).add(rec);
                Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(Optional.empty(), rec.leaderEpoch());
                SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(rec.offset() + 1L, rec.leaderEpoch(), leaderAndEpoch);
                this.subscriptions.position(entry.getKey(), newPosition);
                nextOffsetAndMetadata.put(entry.getKey(), new OffsetAndMetadata(rec.offset() + 1L, rec.leaderEpoch(), ""));
            }
            toClear.add(entry.getKey());
        }
        toClear.forEach(this.records::remove);
        return new ConsumerRecords(results, nextOffsetAndMetadata);
    }

    public synchronized void addRecord(ConsumerRecord<K, V> record) {
        this.ensureNotClosed();
        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        Set<TopicPartition> currentAssigned = this.subscriptions.assignedPartitions();
        if (!currentAssigned.contains(tp)) {
            throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
        }
        List recs = this.records.computeIfAbsent(tp, k -> new ArrayList());
        recs.add(record);
    }

    public synchronized void setPollException(KafkaException exception) {
        this.pollException = exception;
    }

    public synchronized void setOffsetsException(KafkaException exception) {
        this.offsetsException = exception;
    }

    @Override
    public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.ensureNotClosed();
        this.committed.putAll(offsets);
        if (callback != null) {
            callback.onComplete(offsets, null);
        }
    }

    @Override
    public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.commitAsync(offsets, null);
    }

    @Override
    public synchronized void commitAsync() {
        this.commitAsync(null);
    }

    @Override
    public synchronized void commitAsync(OffsetCommitCallback callback) {
        this.ensureNotClosed();
        this.commitAsync(this.subscriptions.allConsumed(), callback);
    }

    @Override
    public synchronized void commitSync() {
        this.commitSync(this.subscriptions.allConsumed());
    }

    @Override
    public synchronized void commitSync(Duration timeout) {
        this.commitSync(this.subscriptions.allConsumed());
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
        this.commitSync(offsets);
    }

    @Override
    public synchronized void seek(TopicPartition partition, long offset) {
        this.ensureNotClosed();
        this.subscriptions.seek(partition, offset);
    }

    @Override
    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
        this.ensureNotClosed();
        this.subscriptions.seek(partition, offsetAndMetadata.offset());
    }

    @Override
    public synchronized Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
        this.ensureNotClosed();
        return partitions.stream().filter(this.committed::containsKey).collect(Collectors.toMap(tp -> tp, tp -> this.subscriptions.isAssigned((TopicPartition)tp) ? this.committed.get(tp) : new OffsetAndMetadata(0L)));
    }

    @Override
    public synchronized Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) {
        return this.committed(partitions);
    }

    @Override
    public synchronized long position(TopicPartition partition) {
        this.ensureNotClosed();
        if (!this.subscriptions.isAssigned(partition)) {
            throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
        }
        SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
        if (position == null) {
            this.updateFetchPosition(partition);
            position = this.subscriptions.position(partition);
        }
        return position.offset;
    }

    @Override
    public synchronized long position(TopicPartition partition, Duration timeout) {
        return this.position(partition);
    }

    @Override
    public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
        this.ensureNotClosed();
        this.subscriptions.requestOffsetReset(partitions, AutoOffsetResetStrategy.EARLIEST);
    }

    public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
        this.beginningOffsets.putAll(newOffsets);
    }

    @Override
    public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
        this.ensureNotClosed();
        this.subscriptions.requestOffsetReset(partitions, AutoOffsetResetStrategy.LATEST);
    }

    public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) {
        this.endOffsets.putAll(newOffsets);
    }

    public synchronized void updateDurationOffsets(Map<TopicPartition, Long> newOffsets) {
        this.durationResetOffsets.putAll(newOffsets);
    }

    public void disableTelemetry() {
        this.telemetryDisabled = true;
    }

    public void injectTimeoutException(int injectTimeoutExceptionCounter) {
        this.injectTimeoutExceptionCounter = injectTimeoutExceptionCounter;
    }

    public void setClientInstanceId(Uuid instanceId) {
        this.clientInstanceId = instanceId;
    }

    @Override
    public Uuid clientInstanceId(Duration timeout) {
        if (this.telemetryDisabled) {
            throw new IllegalStateException();
        }
        if (this.clientInstanceId == null) {
            throw new UnsupportedOperationException("clientInstanceId not set");
        }
        if (this.injectTimeoutExceptionCounter != 0) {
            if (this.injectTimeoutExceptionCounter > 0) {
                --this.injectTimeoutExceptionCounter;
            }
            throw new TimeoutException();
        }
        return this.clientInstanceId;
    }

    @Override
    public synchronized Map<MetricName, ? extends Metric> metrics() {
        this.ensureNotClosed();
        return Collections.emptyMap();
    }

    @Override
    public synchronized List<PartitionInfo> partitionsFor(String topic) {
        this.ensureNotClosed();
        return this.partitions.getOrDefault(topic, Collections.emptyList());
    }

    @Override
    public synchronized Map<String, List<PartitionInfo>> listTopics() {
        this.ensureNotClosed();
        return this.partitions;
    }

    public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) {
        this.ensureNotClosed();
        this.partitions.put(topic, partitions);
    }

    @Override
    public synchronized void pause(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            this.subscriptions.pause(partition);
            this.paused.add(partition);
        }
    }

    @Override
    public synchronized void resume(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            this.subscriptions.resume(partition);
            this.paused.remove(partition);
        }
    }

    @Override
    public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        throw new UnsupportedOperationException("Not implemented yet.");
    }

    @Override
    public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        if (this.offsetsException != null) {
            KafkaException exception = this.offsetsException;
            this.offsetsException = null;
            throw exception;
        }
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : partitions) {
            Long beginningOffset = this.beginningOffsets.get(tp);
            if (beginningOffset == null) {
                throw new IllegalStateException("The partition " + String.valueOf(tp) + " does not have a beginning offset.");
            }
            result.put(tp, beginningOffset);
        }
        return result;
    }

    @Override
    public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        if (this.offsetsException != null) {
            KafkaException exception = this.offsetsException;
            this.offsetsException = null;
            throw exception;
        }
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : partitions) {
            Long endOffset = this.endOffsets.get(tp);
            if (endOffset == null) {
                throw new IllegalStateException("The partition " + String.valueOf(tp) + " does not have an end offset.");
            }
            result.put(tp, endOffset);
        }
        return result;
    }

    @Override
    public void close() {
        this.close(Duration.ofMillis(30000L));
    }

    @Override
    public synchronized void close(Duration timeout) {
        this.closed = true;
    }

    public synchronized boolean closed() {
        return this.closed;
    }

    @Override
    public synchronized void wakeup() {
        this.wakeup.set(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void schedulePollTask(Runnable task) {
        Queue<Runnable> queue = this.pollTasks;
        synchronized (queue) {
            this.pollTasks.add(task);
        }
    }

    public synchronized void scheduleNopPollTask() {
        this.schedulePollTask(() -> {});
    }

    @Override
    public synchronized Set<TopicPartition> paused() {
        return Set.copyOf(this.paused);
    }

    private void ensureNotClosed() {
        if (this.closed) {
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void updateFetchPosition(TopicPartition tp) {
        if (this.subscriptions.isOffsetResetNeeded(tp)) {
            this.resetOffsetPosition(tp);
        } else if (!this.committed.containsKey(tp)) {
            this.subscriptions.requestOffsetReset(tp);
            this.resetOffsetPosition(tp);
        } else {
            this.subscriptions.seek(tp, this.committed.get(tp).offset());
        }
    }

    private void resetOffsetPosition(TopicPartition tp) {
        Long offset;
        AutoOffsetResetStrategy strategy = this.subscriptions.resetStrategy(tp);
        if (strategy == AutoOffsetResetStrategy.EARLIEST) {
            offset = this.beginningOffsets.get(tp);
            if (offset == null) {
                throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning");
            }
        } else if (strategy == AutoOffsetResetStrategy.LATEST) {
            offset = this.endOffsets.get(tp);
            if (offset == null) {
                throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
            }
        } else if (strategy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
            offset = this.durationResetOffsets.get(tp);
            if (offset == null) {
                throw new IllegalStateException("MockConsumer didn't have duration offset specified, but tried to seek to timestamp");
            }
        } else {
            throw new NoOffsetForPartitionException(tp);
        }
        this.seek(tp, offset);
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
        return this.partitionsFor(topic);
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
        return this.listTopics();
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        return this.offsetsForTimes(timestampsToSearch);
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        return this.beginningOffsets(partitions);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        return this.endOffsets(partitions);
    }

    @Override
    public OptionalLong currentLag(TopicPartition topicPartition) {
        if (this.endOffsets.containsKey(topicPartition)) {
            return OptionalLong.of(this.endOffsets.get(topicPartition) - this.position(topicPartition));
        }
        return OptionalLong.of(0L);
    }

    @Override
    public ConsumerGroupMetadata groupMetadata() {
        return new ConsumerGroupMetadata("dummy.group.id", 1, "1", Optional.empty());
    }

    @Override
    public void enforceRebalance() {
        this.enforceRebalance(null);
    }

    @Override
    public void enforceRebalance(String reason) {
        this.shouldRebalance = true;
    }

    public boolean shouldRebalance() {
        return this.shouldRebalance;
    }

    public void resetShouldRebalance() {
        this.shouldRebalance = false;
    }

    public Duration lastPollTimeout() {
        return this.lastPollTimeout;
    }

    public List<KafkaMetric> addedMetrics() {
        return Collections.unmodifiableList(this.addedMetrics);
    }
}

