package net.bluemind.core.task.service.internal;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import io.netty.util.concurrent.FastThreadLocal;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import net.bluemind.configfile.core.CoreConfig;
import net.bluemind.core.api.fault.ErrorCode;
import net.bluemind.core.api.fault.ServerFault;
import net.bluemind.core.task.api.ITask;
import net.bluemind.core.task.api.TaskRef;
import net.bluemind.core.task.api.TaskStatus;
import net.bluemind.core.task.service.IServerTask;
import net.bluemind.core.task.service.ITasksManager;
import net.bluemind.core.task.service.LoggingTaskMonitor;
import net.bluemind.core.task.service.internal.cq.CQTaskManager;
import net.bluemind.core.utils.CancellableRunnable;
import net.bluemind.core.utils.FutureThreadInfo;
import net.bluemind.lib.vertx.VertxPlatform;
import net.bluemind.lib.vertx.WorkerExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/task/service/internal/TasksManager.class */
public class TasksManager implements ITasksManager {
    public static final String TASKS_MANAGER_EVENT = "tasks-manager";
    private static final Cache<String, TaskManager> completedTasks;
    private static final Cache<String, TaskStatus> statusHistory;
    private static final ExecutorService executor;
    private final Vertx vertx;
    private static Logger logger = LoggerFactory.getLogger(TasksManager.class);
    private static final Object ROOT_TASK_MARKER = new Object();
    private static final ConcurrentHashMap<String, TaskManager> tasks = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, FutureThreadInfo> futures = new ConcurrentHashMap<>();
    private static final FastThreadLocal<Object> threadLocal = new FastThreadLocal<>();
    private static final ExecutorService directExecutor = MoreExecutors.newDirectExecutorService();

    /* loaded from: input_file:net/bluemind/core/task/service/internal/TasksManager$EventBusReceiveVerticle.class */
    public static class EventBusReceiveVerticle extends AbstractVerticle {
        private TasksManager tasksManager;

        public EventBusReceiveVerticle(TasksManager tasksManager) {
            this.tasksManager = tasksManager;
        }

        public void start() {
            this.vertx.eventBus().consumer(TasksManager.TASKS_MANAGER_EVENT).handler(message -> {
                String string = ((JsonObject) message.body()).getString("id");
                TaskManager taskManager = this.tasksManager.getTaskManager(string);
                if (taskManager != null) {
                    taskManager.handle((Message<JsonObject>) message);
                } else if (TasksManager.logger.isDebugEnabled()) {
                    TasksManager.logger.debug("[{}] task manager not found", string);
                }
            });
        }
    }

    static {
        Config config = CoreConfig.get();
        executor = new WorkerExecutorService("bm-tasks", config.getInt("core.pool.tasks.size"), 1L, TimeUnit.DAYS, () -> {
            threadLocal.set(ROOT_TASK_MARKER);
        });
        statusHistory = Caffeine.newBuilder().maximumSize(4096L).build();
        completedTasks = Caffeine.newBuilder().maximumSize(512L).expireAfterWrite(config.getDuration("core.pool.tasks.completed-timeout")).evictionListener((str, taskManager, removalCause) -> {
            if (taskManager != null) {
                statusHistory.put(str, taskManager.status);
                VertxPlatform.getVertx().setTimer(5000L, l -> {
                    VertxPlatform.getVertx().executeBlocking(() -> {
                        VertxPlatform.eventBus().publish("tasks.manager.cleanups.expire", str);
                        cleanupTask(taskManager);
                        return null;
                    });
                });
            }
        }).build();
    }

    public TasksManager(Vertx vertx) {
        this.vertx = vertx;
        vertx.deployVerticle(new EventBusReceiveVerticle(this));
    }

    @Override // net.bluemind.core.task.service.ITasksManager
    public TaskRef run(IServerTask iServerTask) {
        return run(UUID.randomUUID().toString(), null, iServerTask);
    }

    @Override // net.bluemind.core.task.service.ITasksManager
    public TaskRef run(Logger logger2, IServerTask iServerTask) {
        return run(UUID.randomUUID().toString(), logger2, iServerTask);
    }

    @Override // net.bluemind.core.task.service.ITasksManager
    public TaskRef run(String str, IServerTask iServerTask) {
        return run(str, null, iServerTask);
    }

    @Override // net.bluemind.core.task.service.ITasksManager
    public TaskRef run(String str, Logger logger2, IServerTask iServerTask) {
        CQTaskManager cQTaskManager = new CQTaskManager(str);
        LoggingTaskMonitor loggingTaskMonitor = new LoggingTaskMonitor(logger2, new TaskMonitor(this.vertx.eventBus(), str), 0);
        TaskManager putIfAbsent = tasks.putIfAbsent(str, cQTaskManager);
        if (putIfAbsent != null) {
            if (!putIfAbsent.status().state.ended) {
                throw new ServerFault("task " + str + " already running");
            }
            cleanupTask(putIfAbsent);
        }
        try {
            executeTask(str, iServerTask, loggingTaskMonitor, cQTaskManager, inTaskThread() ? directExecutor : executor);
            return TaskRef.create(str);
        } catch (RejectedExecutionException unused) {
            cleanupTask(cQTaskManager);
            throw new ServerFault("The task has been rejected by the thread pool", ErrorCode.FAILURE);
        }
    }

    @Override // net.bluemind.core.task.service.ITasksManager
    public boolean inTaskThread() {
        return ROOT_TASK_MARKER == threadLocal.get();
    }

    private void executeTask(final String str, final IServerTask iServerTask, final LoggingTaskMonitor loggingTaskMonitor, final TaskManager taskManager, ExecutorService executorService) {
        CancellableRunnable cancellableRunnable = new CancellableRunnable() { // from class: net.bluemind.core.task.service.internal.TasksManager.1
            public void run() {
                try {
                    CompletableFuture execute = iServerTask.execute(loggingTaskMonitor);
                    LoggingTaskMonitor loggingTaskMonitor2 = loggingTaskMonitor;
                    String str2 = str;
                    TaskManager taskManager2 = taskManager;
                    CompletableFuture<Void> thenAccept = execute.thenAccept(r8 -> {
                        loggingTaskMonitor2.end(true, "", null);
                        TasksManager.completedTasks.put(str2, taskManager2);
                    });
                    String str3 = str;
                    LoggingTaskMonitor loggingTaskMonitor3 = loggingTaskMonitor;
                    TaskManager taskManager3 = taskManager;
                    thenAccept.exceptionally(th -> {
                        String message = (!(th instanceof CompletionException) || th.getCause() == null) ? th.getMessage() : th.getCause().getMessage();
                        TasksManager.logger.error("error in task {}", str3, th);
                        loggingTaskMonitor3.end(false, message, null);
                        TasksManager.completedTasks.put(str3, taskManager3);
                        return null;
                    });
                } catch (Exception e) {
                    TasksManager.logger.error("error in task {}", str, e);
                    loggingTaskMonitor.end(false, e.getMessage(), null);
                    TasksManager.completedTasks.put(str, taskManager);
                }
            }

            public void cancel() {
                iServerTask.cancel();
            }
        };
        tasks.put(str, taskManager);
        futures.put(str, new FutureThreadInfo(executorService.submit((Runnable) cancellableRunnable), cancellableRunnable));
    }

    private static void cleanupTask(TaskManager taskManager) {
        if (tasks.remove(taskManager.getId(), taskManager)) {
            futures.remove(taskManager.getId());
            taskManager.cleanUp();
        }
    }

    @Override // net.bluemind.core.task.service.ITasksManager
    public ITask getTask(String str) {
        TaskManager taskManager = tasks.get(str);
        if (taskManager != null) {
            return new TaskService(taskManager);
        }
        TaskStatus taskStatus = (TaskStatus) statusHistory.getIfPresent(str);
        if (taskStatus != null) {
            return new ExpiredTaskService(str, taskStatus);
        }
        return null;
    }

    public TaskManager getTaskManager(String str) {
        return tasks.get(str);
    }

    @Override // net.bluemind.core.task.service.ITasksManager
    public void cancel(String str) {
        if (futures.containsKey(str)) {
            logger.info("Cancelling Task {}", str);
            futures.get(str).runnable.cancel();
            futures.get(str).future.cancel(true);
        }
    }

    @VisibleForTesting
    public static void reset() {
        tasks.forEach((str, taskManager) -> {
            futures.get(str).runnable.cancel();
            futures.get(str).future.cancel(true);
            cleanupTask(taskManager);
        });
        tasks.clear();
        completedTasks.invalidateAll();
    }
}
