/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultKafkaStreamsState
implements KafkaStreamsState {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaStreamsState.class);
    private final ProcessId processId;
    private final int numProcessingThreads;
    private final Map<String, String> clientTags;
    private final SortedSet<TaskId> previousActiveTasks;
    private final SortedSet<TaskId> previousStandbyTasks;
    private final SortedMap<String, Set<TaskId>> taskIdsByConsumer;
    private final Optional<HostInfo> hostInfo;
    private final Optional<Map<TaskId, Long>> taskLagTotals;
    private final Optional<String> rackId;

    public DefaultKafkaStreamsState(ProcessId processId, int numProcessingThreads, Map<String, String> clientTags, SortedSet<TaskId> previousActiveTasks, SortedSet<TaskId> previousStandbyTasks, SortedMap<String, Set<TaskId>> taskIdsByConsumer, Optional<HostInfo> hostInfo, Optional<Map<TaskId, Long>> taskLagTotals, Optional<String> rackId) {
        this.processId = processId;
        this.numProcessingThreads = numProcessingThreads;
        this.clientTags = Collections.unmodifiableMap(clientTags);
        this.previousActiveTasks = Collections.unmodifiableSortedSet(previousActiveTasks);
        this.previousStandbyTasks = Collections.unmodifiableSortedSet(previousStandbyTasks);
        this.taskIdsByConsumer = Collections.unmodifiableSortedMap(taskIdsByConsumer);
        this.hostInfo = hostInfo;
        this.taskLagTotals = taskLagTotals;
        this.rackId = rackId;
    }

    @Override
    public ProcessId processId() {
        return this.processId;
    }

    @Override
    public int numProcessingThreads() {
        return this.numProcessingThreads;
    }

    @Override
    public SortedSet<String> consumerClientIds() {
        return new TreeSet<String>(this.taskIdsByConsumer.keySet());
    }

    @Override
    public SortedSet<TaskId> previousActiveTasks() {
        return this.previousActiveTasks;
    }

    @Override
    public SortedSet<TaskId> previousStandbyTasks() {
        return this.previousStandbyTasks;
    }

    @Override
    public long lagFor(TaskId task) {
        if (this.taskLagTotals.isEmpty()) {
            LOG.error("lagFor was called on a KafkaStreamsState {} that does not support lag computations.", (Object)this.processId);
            throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + String.valueOf(this.processId));
        }
        Long totalLag = this.taskLagTotals.get().get(task);
        if (totalLag == null) {
            LOG.error("Task lag lookup failed: {} not in {}", (Object)task, (Object)Arrays.toString(this.taskLagTotals.get().keySet().toArray()));
            throw new IllegalStateException("Tried to lookup lag for unknown task " + String.valueOf(task));
        }
        return totalLag;
    }

    @Override
    public SortedSet<TaskId> prevTasksByLag(String consumerClientId) {
        if (this.taskLagTotals.isEmpty()) {
            LOG.error("prevTasksByLag was called on a KafkaStreamsState {} that does not support lag computations.", (Object)this.processId);
            throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + String.valueOf(this.processId));
        }
        TreeSet<TaskId> prevTasksByLag = new TreeSet<TaskId>(Comparator.comparingLong(this::lagFor).thenComparing(TaskId::compareTo));
        HashSet prevOwnedStatefulTasks = this.taskIdsByConsumer.containsKey(consumerClientId) ? (Set)this.taskIdsByConsumer.get(consumerClientId) : new HashSet();
        for (TaskId task : prevOwnedStatefulTasks) {
            if (this.taskLagTotals.get().containsKey(task)) {
                prevTasksByLag.add(task);
                continue;
            }
            LOG.debug("Skipping previous task {} since it's not part of the current assignment", (Object)task);
        }
        return prevTasksByLag;
    }

    @Override
    public Map<TaskId, Long> statefulTasksToLagSums() {
        if (this.taskLagTotals.isEmpty()) {
            LOG.error("statefulTasksToLagSums was called on a KafkaStreamsState {} that does not support lag computations.", (Object)this.processId);
            throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + String.valueOf(this.processId));
        }
        return this.taskLagTotals.get().keySet().stream().collect(Collectors.toMap(taskId -> taskId, this::lagFor));
    }

    @Override
    public Optional<HostInfo> hostInfo() {
        return this.hostInfo;
    }

    @Override
    public Map<String, String> clientTags() {
        return this.clientTags;
    }

    @Override
    public Optional<String> rackId() {
        return this.rackId;
    }
}

