/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubmittedRecords {
    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
    final Map<Map<String, Object>, Deque<SubmittedRecord>> records = new HashMap<Map<String, Object>, Deque<SubmittedRecord>>();
    private int numUnackedMessages = 0;
    private CountDownLatch messageDrainLatch;

    public SubmittedRecord submit(SourceRecord record) {
        return this.submit(record.sourcePartition(), record.sourceOffset());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) {
        SubmittedRecord result = new SubmittedRecord(partition, offset);
        this.records.computeIfAbsent(result.partition(), p -> new LinkedList()).add(result);
        SubmittedRecords submittedRecords = this;
        synchronized (submittedRecords) {
            ++this.numUnackedMessages;
        }
        return result;
    }

    public CommittableOffsets committableOffsets() {
        HashMap<Map<String, Object>, Map<String, Object>> offsets = new HashMap<Map<String, Object>, Map<String, Object>>();
        int totalCommittableMessages = 0;
        int totalUncommittableMessages = 0;
        int largestDequeSize = 0;
        Map<String, Object> largestDequePartition = null;
        for (Map.Entry<Map<String, Object>, Deque<SubmittedRecord>> entry : this.records.entrySet()) {
            Map<String, Object> partition = entry.getKey();
            Deque<SubmittedRecord> queuedRecords = entry.getValue();
            int initialDequeSize = queuedRecords.size();
            if (this.canCommitHead(queuedRecords)) {
                Map<String, Object> offset = this.committableOffset(queuedRecords);
                offsets.put(partition, offset);
            }
            int uncommittableMessages = queuedRecords.size();
            int committableMessages = initialDequeSize - uncommittableMessages;
            totalCommittableMessages += committableMessages;
            totalUncommittableMessages += uncommittableMessages;
            if (uncommittableMessages <= largestDequeSize) continue;
            largestDequeSize = uncommittableMessages;
            largestDequePartition = partition;
        }
        this.records.values().removeIf(Collection::isEmpty);
        return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, this.records.size(), largestDequeSize, largestDequePartition);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
        CountDownLatch messageDrainLatch;
        SubmittedRecords submittedRecords = this;
        synchronized (submittedRecords) {
            this.messageDrainLatch = messageDrainLatch = new CountDownLatch(this.numUnackedMessages);
        }
        try {
            return messageDrainLatch.await(timeout, timeUnit);
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    private Map<String, Object> committableOffset(Deque<SubmittedRecord> queuedRecords) {
        Map<String, Object> result = null;
        while (this.canCommitHead(queuedRecords)) {
            result = queuedRecords.poll().offset();
        }
        return result;
    }

    private boolean canCommitHead(Deque<SubmittedRecord> queuedRecords) {
        return queuedRecords.peek() != null && queuedRecords.peek().acked();
    }

    private synchronized void messageAcked() {
        --this.numUnackedMessages;
        if (this.messageDrainLatch != null) {
            this.messageDrainLatch.countDown();
        }
    }

    public class SubmittedRecord {
        private final Map<String, Object> partition;
        private final Map<String, Object> offset;
        private final AtomicBoolean acked;

        public SubmittedRecord(Map<String, Object> partition, Map<String, Object> offset) {
            this.partition = partition;
            this.offset = offset;
            this.acked = new AtomicBoolean(false);
        }

        public void ack() {
            if (this.acked.compareAndSet(false, true)) {
                SubmittedRecords.this.messageAcked();
            }
        }

        public boolean drop() {
            Deque<SubmittedRecord> deque = SubmittedRecords.this.records.get(this.partition);
            if (deque == null) {
                log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", this.partition);
                return false;
            }
            boolean result = deque.removeLastOccurrence(this);
            if (deque.isEmpty()) {
                SubmittedRecords.this.records.remove(this.partition);
            }
            if (result) {
                SubmittedRecords.this.messageAcked();
            } else {
                log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", this.partition);
            }
            return result;
        }

        private boolean acked() {
            return this.acked.get();
        }

        private Map<String, Object> partition() {
            return this.partition;
        }

        private Map<String, Object> offset() {
            return this.offset;
        }
    }

    record CommittableOffsets(Map<Map<String, Object>, Map<String, Object>> offsets, int numCommittableMessages, int numUncommittableMessages, int numDeques, int largestDequeSize, Map<String, Object> largestDequePartition) {
        public static final CommittableOffsets EMPTY = new CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);

        CommittableOffsets {
            offsets = Collections.unmodifiableMap(offsets);
        }

        public boolean hasPending() {
            return this.numUncommittableMessages > 0;
        }

        public boolean isEmpty() {
            return this.numCommittableMessages == 0 && this.numUncommittableMessages == 0 && this.offsets.isEmpty();
        }

        public CommittableOffsets updatedWith(CommittableOffsets newerOffsets) {
            HashMap<Map<String, Object>, Map<String, Object>> offsets = new HashMap<Map<String, Object>, Map<String, Object>>(this.offsets);
            offsets.putAll(newerOffsets.offsets);
            return new CommittableOffsets(offsets, this.numCommittableMessages + newerOffsets.numCommittableMessages, newerOffsets.numUncommittableMessages, newerOffsets.numDeques, newerOffsets.largestDequeSize, newerOffsets.largestDequePartition);
        }
    }
}

