package net.bluemind.core.task.service.internal.cq;

import com.google.common.base.MoreObjects;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.ReadStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.bluemind.core.task.service.internal.LogStream;
import net.bluemind.core.task.service.internal.TaskManager;

/* loaded from: input_file:net/bluemind/core/task/service/internal/cq/CQTaskManager.class */
public class CQTaskManager extends TaskManager implements Handler<Message<JsonObject>> {
    private final PersistentQueue jsQueue;
    private ConcurrentLinkedQueue<LogStream> readers;

    public CQTaskManager(String str) {
        super(str);
        this.readers = new ConcurrentLinkedQueue<>();
        this.jsQueue = PersistentQueue.createFor(str);
    }

    @Override // net.bluemind.core.task.service.internal.TaskManager
    public void cleanUp() {
        this.jsQueue.close();
    }

    @Override // net.bluemind.core.task.service.internal.TaskManager
    public ReadStream<Buffer> log() {
        LogStream logStream = new LogStream(this.jsQueue.subscriber(0));
        registerReader(logStream);
        logStream.exceptionHandler(th -> {
            this.readers.remove(logStream);
        });
        return logStream;
    }

    @Override // net.bluemind.core.task.service.internal.TaskManager
    public List<String> getCurrentLogs(int i) {
        ArrayList arrayList = new ArrayList(64);
        this.jsQueue.subscriber(i).fetchAll(jsonObject -> {
            arrayList.add((String) Optional.ofNullable(jsonObject.getString("message")).orElse(""));
        });
        return arrayList;
    }

    private void registerReader(LogStream logStream) {
        this.readers.add(logStream);
    }

    @Override // net.bluemind.core.task.service.internal.TaskManager
    protected void pushLog(JsonObject jsonObject, boolean z) {
        this.jsQueue.put(jsonObject);
        Iterator<LogStream> it = this.readers.iterator();
        while (it.hasNext()) {
            LogStream next = it.next();
            next.wakeUp();
            if (z) {
                next.end();
            }
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper(CQTaskManager.class).add("taskId", this.taskId).add("status", this.status).add("queue", this.jsQueue).toString();
    }
}
