package net.bluemind.core.task.service.internal.cq;

import io.netty.util.concurrent.DefaultThreadFactory;
import io.vertx.core.json.JsonObject;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import net.bluemind.core.api.fault.ErrorCode;
import net.bluemind.core.api.fault.ServerFault;
import net.bluemind.core.task.service.internal.ISubscriber;
import net.bluemind.lib.vertx.VertxPlatform;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.AbstractReferenceCounted;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/task/service/internal/cq/PersistentQueue.class */
public class PersistentQueue implements AutoCloseable {
    private static final String QUEUES_ROOT = "/var/cache/bm-core/tasks-queues";
    private static final Logger logger = LoggerFactory.getLogger(PersistentQueue.class);
    private static final ExecutorService TAIL_LOOP = Executors.newSingleThreadExecutor(new DefaultThreadFactory("cq-tail-loop"));
    private static final AtomicLong uid = new AtomicLong();
    private static final Consumer<SingleChronicleQueueBuilder> preBuilder;
    private final SingleChronicleQueue queue;
    private final String tid;
    private final long subId;
    private final ExcerptAppender appender;

    /* loaded from: input_file:net/bluemind/core/task/service/internal/cq/PersistentQueue$OpenAppenderQueue.class */
    private static class OpenAppenderQueue extends SingleChronicleQueue {
        protected OpenAppenderQueue(SingleChronicleQueueBuilder singleChronicleQueueBuilder) {
            super(singleChronicleQueueBuilder);
        }

        public ExcerptAppender createAppender() {
            return newAppender();
        }
    }

    /* loaded from: input_file:net/bluemind/core/task/service/internal/cq/PersistentQueue$Subscriber.class */
    public static class Subscriber implements ISubscriber {
        private final ExcerptTailer tail;
        private final String tid;

        private Subscriber(String str, ExcerptTailer excerptTailer) {
            this.tid = str;
            this.tail = excerptTailer;
        }

        @Override // net.bluemind.core.task.service.internal.ISubscriber
        public void fetchAll(Consumer<JsonObject> consumer) {
            try {
                PersistentQueue.onCqThread(() -> {
                    fetchAllImpl(consumer);
                    return null;
                }).get(20L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw ServerFault.create(ErrorCode.TIMEOUT, e);
            }
        }

        private void fetchAllImpl(Consumer<JsonObject> consumer) {
            while (true) {
                String readText = this.tail.readText();
                if (readText == null) {
                    return;
                } else {
                    consumer.accept(new JsonObject(readText));
                }
            }
        }

        @Override // net.bluemind.core.task.service.internal.ISubscriber
        public JsonObject fetchOne() {
            try {
                return (JsonObject) PersistentQueue.onCqThread(this::fetchOneImpl).get(10L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw ServerFault.create(ErrorCode.TIMEOUT, e);
            }
        }

        private JsonObject fetchOneImpl() {
            try {
                String readText = this.tail.readText();
                if (readText != null) {
                    return new JsonObject(readText);
                }
                return null;
            } catch (Exception e) {
                PersistentQueue.logger.error("[{}] reading from queue failed", this.tid, e);
                return null;
            }
        }

        @Override // net.bluemind.core.task.service.internal.ISubscriber
        public String taskId() {
            return this.tid;
        }

        /* synthetic */ Subscriber(String str, ExcerptTailer excerptTailer, Subscriber subscriber) {
            this(str, excerptTailer);
        }
    }

    static {
        System.setProperty("chronicle.announcer.disable", "true");
        System.setProperty("chronicle.disk.monitor.disable", "true");
        AbstractReferenceCounted.disableReferenceTracing();
        AbstractCloseable.disableCloseableTracing();
        preBuilder = initPrebuild();
    }

    private static Consumer<SingleChronicleQueueBuilder> initPrebuild() {
        try {
            Method declaredMethod = SingleChronicleQueueBuilder.class.getDeclaredMethod("preBuild", new Class[0]);
            declaredMethod.setAccessible(true);
            return singleChronicleQueueBuilder -> {
                try {
                    declaredMethod.invoke(singleChronicleQueueBuilder, new Object[0]);
                } catch (Exception e) {
                    throw new ServerFault(e);
                }
            };
        } catch (Exception e) {
            throw new ServerFault(e);
        }
    }

    public static PersistentQueue createFor(String str) {
        File file = new File(QUEUES_ROOT);
        file.mkdirs();
        long incrementAndGet = uid.incrementAndGet();
        SingleChronicleQueueBuilder blockSize = SingleChronicleQueueBuilder.single(new File(file, String.valueOf(str) + "." + incrementAndGet)).blockSize(524288L);
        preBuilder.accept(blockSize);
        return new PersistentQueue(str, incrementAndGet, new OpenAppenderQueue(blockSize));
    }

    private PersistentQueue(String str, long j, OpenAppenderQueue openAppenderQueue) {
        this.tid = str;
        this.subId = j;
        this.queue = openAppenderQueue;
        openAppenderQueue.getClass();
        this.appender = (ExcerptAppender) onCqThread(openAppenderQueue::createAppender).join();
    }

    public String toString() {
        return "CQ{uid: " + this.subId + "}";
    }

    public synchronized void put(JsonObject jsonObject) {
        if (this.queue.isClosed()) {
            if (logger.isWarnEnabled()) {
                logger.warn("Queue {} {} is closed, msg ({}) dropped", new Object[]{this.tid, Long.valueOf(this.subId), jsonObject.encode()});
            }
        } else {
            try {
                onCqThread(() -> {
                    this.appender.writeText(jsonObject.encode());
                    return null;
                }).get(10L, TimeUnit.SECONDS);
            } catch (Exception e) {
                throw ServerFault.create(ErrorCode.TIMEOUT, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> CompletableFuture<T> onCqThread(Supplier<T> supplier) {
        return CompletableFuture.supplyAsync(supplier, TAIL_LOOP).exceptionally((Function) th -> {
            logger.error("failure on CQ thread", th);
            return null;
        });
    }

    public ISubscriber subscriber(int i) {
        try {
            return new Subscriber(this.tid, (ExcerptTailer) onCqThread(() -> {
                ExcerptTailer createTailer = this.queue.createTailer();
                if (i > 0) {
                    long index = createTailer.index();
                    if (!createTailer.moveToIndex(index + i)) {
                        logger.debug("Failed to seek to CQ cycle {}, moving to end.", Long.valueOf(index + i));
                        createTailer.toEnd();
                    }
                }
                return createTailer;
            }).get(10L, TimeUnit.SECONDS), null);
        } catch (Exception e) {
            throw ServerFault.create(ErrorCode.TIMEOUT, e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        File file = this.queue.file();
        onCqThread(() -> {
            this.appender.close();
            this.queue.close();
            return null;
        }).join();
        VertxPlatform.getVertx().executeBlocking(promise -> {
            try {
                Arrays.stream(file.listFiles()).forEach((v0) -> {
                    v0.delete();
                });
                Files.delete(file.toPath());
                logger.info("[{}] CQ deleted ({}).", this.tid, Long.valueOf(this.subId));
            } catch (IOException e) {
                logger.error("[{}] failed to delete queue dir", this.tid, e);
            } finally {
                promise.complete();
            }
        }, false, asyncResult -> {
            if (asyncResult.failed()) {
                logger.error("CQ cleanup failed", asyncResult.cause());
            }
        });
    }
}
