package net.bluemind.core.task.service.internal.cq;

import io.vertx.core.json.JsonObject;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import net.bluemind.core.api.fault.ErrorCode;
import net.bluemind.core.api.fault.ServerFault;
import net.bluemind.core.task.service.internal.ISubscriber;
import net.bluemind.core.task.service.internal.cq.LoopProvider;
import net.bluemind.lib.vertx.VertxPlatform;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.AbstractReferenceCounted;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/task/service/internal/cq/PersistentQueue.class */
public class PersistentQueue implements AutoCloseable {
    private static final String QUEUES_ROOT = System.getProperty("chronicle.queues.root", "/var/cache/bm-core/tasks-queues");
    private static final Logger logger = LoggerFactory.getLogger(PersistentQueue.class);
    private static final AtomicLong uid = new AtomicLong();
    private final SingleChronicleQueue queue;
    private final String tid;
    private final long subId;
    private final ExcerptAppender appender;
    private final LoopProvider.Loop loop = LoopProvider.get();
    private final AtomicBoolean closeOnce = new AtomicBoolean(false);

    /* loaded from: input_file:net/bluemind/core/task/service/internal/cq/PersistentQueue$Subscriber.class */
    public static class Subscriber implements ISubscriber {
        private final ExcerptTailer tail;
        private final String tid;
        private final LoopProvider.Loop loop;

        private Subscriber(String str, ExcerptTailer excerptTailer, LoopProvider.Loop loop) {
            this.tid = str;
            this.tail = excerptTailer;
            this.loop = loop;
        }

        @Override // net.bluemind.core.task.service.internal.ISubscriber
        public void fetchAll(Consumer<JsonObject> consumer) {
            try {
                PersistentQueue.onCqThread(this.loop, () -> {
                    fetchAllImpl(consumer);
                    return null;
                }).get(20L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw ServerFault.create(ErrorCode.TIMEOUT, e);
            }
        }

        private void fetchAllImpl(Consumer<JsonObject> consumer) {
            while (true) {
                String readText = this.tail.readText();
                if (readText == null) {
                    return;
                } else {
                    consumer.accept(new JsonObject(readText));
                }
            }
        }

        @Override // net.bluemind.core.task.service.internal.ISubscriber
        public JsonObject fetchOne() {
            try {
                return (JsonObject) PersistentQueue.onCqThread(this.loop, this::fetchOneImpl).get(10L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw ServerFault.create(ErrorCode.TIMEOUT, e);
            }
        }

        private JsonObject fetchOneImpl() {
            try {
                String readText = this.tail.readText();
                if (readText != null) {
                    return new JsonObject(readText);
                }
                return null;
            } catch (Exception e) {
                PersistentQueue.logger.error("[{}] reading from queue failed", this.tid, e);
                return null;
            }
        }

        @Override // net.bluemind.core.task.service.internal.ISubscriber
        public String taskId() {
            return this.tid;
        }

        @Override // net.bluemind.core.task.service.internal.ISubscriber, java.lang.AutoCloseable
        public void close() {
            this.tail.close();
        }
    }

    static {
        System.setProperty("chronicle.disk.monitor.disable", "true");
        System.setProperty("chronicle.analytics.disable", "true");
        Jvm.setResourceTracing(false);
        AbstractReferenceCounted.disableReferenceTracing();
        try {
            Files.walk(Paths.get(QUEUES_ROOT, new String[0]), new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                return v0.toFile();
            }).forEach((v0) -> {
                v0.delete();
            });
        } catch (IOException unused) {
        }
    }

    public static PersistentQueue createFor(String str) {
        File file = new File(QUEUES_ROOT);
        file.mkdirs();
        long incrementAndGet = uid.incrementAndGet();
        return new PersistentQueue(str, incrementAndGet, SingleChronicleQueueBuilder.single(new File(file, str + "." + incrementAndGet)).blockSize(524288L).build());
    }

    private PersistentQueue(String str, long j, SingleChronicleQueue singleChronicleQueue) {
        this.tid = str;
        this.subId = j;
        this.queue = singleChronicleQueue;
        LoopProvider.Loop loop = this.loop;
        singleChronicleQueue.getClass();
        this.appender = (ExcerptAppender) onCqThread(loop, singleChronicleQueue::createAppender).orTimeout(10L, TimeUnit.SECONDS).join();
    }

    public String toString() {
        return "CQ{uid: " + this.subId + "}";
    }

    public synchronized boolean put(JsonObject jsonObject) {
        if (!this.queue.isClosed()) {
            try {
                return ((Boolean) onCqThread(this.loop, () -> {
                    this.appender.writeText(jsonObject.encode());
                    return true;
                }).get(10L, TimeUnit.SECONDS)).booleanValue();
            } catch (Exception e) {
                throw ServerFault.create(ErrorCode.TIMEOUT, e);
            }
        }
        if (!logger.isWarnEnabled()) {
            return false;
        }
        logger.warn("Queue {} {} is closed, msg ({}) dropped", new Object[]{this.tid, Long.valueOf(this.subId), jsonObject.encode()});
        return false;
    }

    private static <T> CompletableFuture<T> onCqThread(LoopProvider.Loop loop, Supplier<T> supplier) {
        return CompletableFuture.supplyAsync(supplier, loop.pool()).exceptionally((Function) th -> {
            logger.error("failure on CQ thread", th);
            return null;
        });
    }

    public ISubscriber subscriber(int i) {
        try {
            return new Subscriber(this.tid, (ExcerptTailer) onCqThread(this.loop, () -> {
                ExcerptTailer createTailer = this.queue.createTailer();
                if (i > 0) {
                    long index = createTailer.index();
                    if (!createTailer.moveToIndex(index + i)) {
                        logger.debug("Failed to seek to CQ cycle {}, moving to end.", Long.valueOf(index + i));
                        createTailer.toEnd();
                    }
                }
                return createTailer;
            }).get(10L, TimeUnit.SECONDS), this.loop);
        } catch (Exception e) {
            throw ServerFault.create(ErrorCode.TIMEOUT, e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        File file = this.queue.file();
        if (this.closeOnce.compareAndSet(false, true)) {
            onCqThread(this.loop, () -> {
                this.appender.close();
                this.queue.close();
                this.loop.unRef();
                return true;
            }).orTimeout(10L, TimeUnit.SECONDS).join();
        }
        VertxPlatform.getVertx().executeBlocking(() -> {
            try {
                File[] listFiles = file.listFiles();
                if (listFiles != null) {
                    Arrays.stream(listFiles).forEach((v0) -> {
                        v0.delete();
                    });
                }
                Files.delete(file.toPath());
                logger.info("[{}] CQ deleted ({}).", this.tid, Long.valueOf(this.subId));
                return null;
            } catch (NoSuchFileException unused) {
                return null;
            }
        }, false).andThen(asyncResult -> {
            if (asyncResult.failed()) {
                logger.error("CQ cleanup failed", asyncResult.cause());
            }
        });
    }
}
