/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

public class TaskExecutionMetadata {
    private static final long CONSTANT_BACKOFF_MS = 5000L;
    private static final long NOT_READY_LOG_INTERVAL_MS = 120000L;
    private final boolean hasNamedTopologies;
    private final Set<String> pausedTopologies;
    private final StreamsConfigUtils.ProcessingMode processingMode;
    private final Collection<Task> successfullyProcessed = new HashSet<Task>();
    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap();
    private final ConcurrentHashMap<TaskId, Long> taskToLastNotReadyLogTime = new ConcurrentHashMap();
    private final Logger log;

    public TaskExecutionMetadata(Set<String> allTopologyNames, Set<String> pausedTopologies, StreamsConfigUtils.ProcessingMode processingMode) {
        this.hasNamedTopologies = allTopologyNames.size() != 1 || !allTopologyNames.contains("__UNNAMED_TOPOLOGY__");
        this.pausedTopologies = pausedTopologies;
        this.processingMode = processingMode;
        this.log = new LogContext("").logger(TaskExecutionMetadata.class);
    }

    public boolean hasNamedTopologies() {
        return this.hasNamedTopologies;
    }

    public StreamsConfigUtils.ProcessingMode processingMode() {
        return this.processingMode;
    }

    public boolean canProcessTask(Task task, long now) {
        String logMessage;
        boolean canProcess;
        boolean taskWasReady;
        String topologyName = task.id().topologyName();
        boolean bl = taskWasReady = !this.taskToLastNotReadyLogTime.containsKey(task.id());
        if (!this.hasNamedTopologies) {
            boolean bl2 = canProcess = !this.pausedTopologies.contains("__UNNAMED_TOPOLOGY__");
            logMessage = canProcess ? String.format("Task %s can be processed: topology is not paused", task.id()) : String.format("Task %s can't be processed: topology is paused", task.id());
        } else if (this.pausedTopologies.contains(topologyName)) {
            canProcess = false;
            logMessage = String.format("Task %s can't be processed: topology '%s' is paused", task.id(), topologyName);
        } else {
            NamedTopologyMetadata metadata = this.topologyNameToErrorMetadata.get(topologyName);
            canProcess = metadata == null || metadata.canProcess() && metadata.canProcessTask(task, now);
            logMessage = canProcess ? String.format("Task %s can be processed for named topology '%s'", task.id(), topologyName) : String.format("Task %s can't be processed for named topology '%s'", task.id(), topologyName);
        }
        if (!canProcess) {
            if (taskWasReady) {
                this.taskToLastNotReadyLogTime.put(task.id(), now);
            } else {
                this.maybeLogNotReady(task.id(), now, logMessage);
            }
        } else {
            this.taskToLastNotReadyLogTime.remove(task.id());
            this.log.trace(logMessage);
        }
        return canProcess;
    }

    private void maybeLogNotReady(TaskId taskId, long now, String logMessage) {
        Long lastLogTime = this.taskToLastNotReadyLogTime.get(taskId);
        if (lastLogTime == null) {
            return;
        }
        long timeSinceLastLog = now - lastLogTime;
        if (timeSinceLastLog >= 120000L) {
            this.log.info("Task {} is not ready to process: {}", (Object)taskId, (Object)(logMessage != null ? logMessage : "Task cannot be processed"));
            this.taskToLastNotReadyLogTime.put(taskId, now);
        }
    }

    public boolean canPunctuateTask(Task task) {
        String topologyName = task.id().topologyName();
        if (topologyName == null) {
            return !this.pausedTopologies.contains("__UNNAMED_TOPOLOGY__");
        }
        return !this.pausedTopologies.contains(topologyName);
    }

    public void registerTaskError(Task task, Throwable t, long now) {
        if (this.hasNamedTopologies) {
            String topologyName = task.id().topologyName();
            this.topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName)).registerTaskError(task, t, now);
        }
    }

    Collection<Task> successfullyProcessed() {
        return this.successfullyProcessed;
    }

    public void addToSuccessfullyProcessed(Task task) {
        this.successfullyProcessed.add(task);
    }

    void removeTaskFromSuccessfullyProcessedBeforeClosing(Task task) {
        this.successfullyProcessed.remove(task);
    }

    void clearSuccessfullyProcessed() {
        this.successfullyProcessed.clear();
    }

    void removeTaskFromNotReadyTracking(Task task) {
        TaskId taskId = task.id();
        this.taskToLastNotReadyLogTime.remove(taskId);
    }

    private class NamedTopologyMetadata {
        private final Logger log;
        private final Map<TaskId, Long> tasksToErrorTime = new ConcurrentHashMap<TaskId, Long>();

        public NamedTopologyMetadata(String topologyName) {
            LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName));
            this.log = logContext.logger(NamedTopologyMetadata.class);
        }

        public boolean canProcess() {
            return true;
        }

        public boolean canProcessTask(Task task, long now) {
            Long errorTime = this.tasksToErrorTime.get(task.id());
            if (errorTime == null) {
                return true;
            }
            if (now - errorTime > 5000L) {
                this.log.info("End backoff for task {} at t={}", (Object)task.id(), (Object)now);
                this.tasksToErrorTime.remove(task.id());
                if (this.tasksToErrorTime.isEmpty()) {
                    TaskExecutionMetadata.this.topologyNameToErrorMetadata.remove(task.id().topologyName());
                }
                return true;
            }
            this.log.debug("Skipping processing for unhealthy task {} at t={}", (Object)task.id(), (Object)now);
            return false;
        }

        public synchronized void registerTaskError(Task task, Throwable t, long now) {
            this.log.info("Begin backoff for unhealthy task {} at t={} due to {}", new Object[]{task.id(), now, t.getClass().getName()});
            this.tasksToErrorTime.put(task.id(), now);
        }
    }
}

