package net.bluemind.sds.sync.service.internal.stream;

import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import net.bluemind.core.api.Stream;
import net.bluemind.core.api.fault.ErrorCode;
import net.bluemind.core.api.fault.ServerFault;
import net.bluemind.sds.sync.api.SdsSyncEvent;
import net.bluemind.sds.sync.service.SdsSyncLock;
import net.bluemind.sds.sync.service.internal.RocksDBLookup;
import net.bluemind.sds.sync.service.internal.queue.SdsSyncQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/sds/sync/service/internal/stream/CQSdsSyncReadStream.class */
public class CQSdsSyncReadStream implements ReadStream<Buffer>, Stream {
    private static final Logger logger = LoggerFactory.getLogger(CQSdsSyncReadStream.class);
    private static final ExecutorService SSEXECUTOR = Executors.newSingleThreadExecutor(new DefaultThreadFactory("sds-sync-cq-stream"));
    private static final int DEFAULT_SLICES_SLOT_SIZE = 512;
    private final SdsSyncQueue q;
    private Handler<Throwable> exceptionHandler;
    private final AtomicBoolean paused;
    private final AtomicBoolean ended;
    private final AtomicLong lastIndex;
    private final AtomicLong currentIndex;
    private final ExcerptTailer tailer;
    private final RocksDBLookup dbLookup;
    private Handler<Buffer> handler;
    private Handler<Void> endHandler;
    private final Context context;
    private List<JsonObject> slices;
    private final int slotSliceSize;

    public CQSdsSyncReadStream(Context context, long j, RocksDBLookup rocksDBLookup) {
        this(context, j, rocksDBLookup, DEFAULT_SLICES_SLOT_SIZE);
    }

    public CQSdsSyncReadStream(Context context, long j, RocksDBLookup rocksDBLookup, int i) {
        this.paused = new AtomicBoolean(true);
        this.ended = new AtomicBoolean(false);
        this.lastIndex = new AtomicLong(0L);
        this.currentIndex = new AtomicLong(0L);
        this.context = context;
        Objects.requireNonNull(context);
        this.q = new SdsSyncQueue();
        this.tailer = getTailer(j);
        this.dbLookup = rocksDBLookup;
        this.slices = new ArrayList(i);
        this.slotSliceSize = i;
    }

    private static <T> CompletableFuture<T> onCqThread(Supplier<T> supplier) {
        return CompletableFuture.supplyAsync(supplier, SSEXECUTOR).exceptionally((Function) th -> {
            logger.error("failure on CQ thread", th);
            return null;
        });
    }

    private ExcerptTailer getTailer(long j) {
        try {
            return (ExcerptTailer) onCqThread(() -> {
                ExcerptTailer createTailer = this.q.createTailer();
                this.lastIndex.set(this.q.queue().lastIndex());
                if (j > 0) {
                    createTailer.moveToIndex(j);
                } else {
                    createTailer.toStart();
                }
                return createTailer;
            }).get(5L, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw ServerFault.create(ErrorCode.TIMEOUT, e);
        }
    }

    public CQSdsSyncReadStream exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public CQSdsSyncReadStream handler(Handler<Buffer> handler) {
        this.handler = handler;
        return this;
    }

    /* renamed from: pause, reason: merged with bridge method [inline-methods] */
    public CQSdsSyncReadStream m6pause() {
        this.paused.set(true);
        return this;
    }

    private void end() {
        if (this.ended.compareAndSet(false, true)) {
            if (this.endHandler != null) {
                this.context.runOnContext(r4 -> {
                    this.endHandler.handle((Object) null);
                });
            }
            close();
        }
    }

    private void read() {
        if (this.paused.get() || this.ended.get()) {
            return;
        }
        fetchPending();
        if (!this.slices.isEmpty() || this.currentIndex.get() < this.lastIndex.get()) {
            return;
        }
        end();
    }

    private void fetchPending() {
        if (this.handler == null || this.ended.get()) {
            return;
        }
        if (SdsSyncLock.get().isLocked()) {
            throw new ServerFault("Sds sync rebuild in progress, please retry in a few minutes");
        }
        if (this.slices.isEmpty()) {
            this.context.executeBlocking(this::populateSlices, true).andThen(asyncResult -> {
                if (asyncResult.failed()) {
                    if (this.exceptionHandler != null) {
                        this.context.runOnContext(r5 -> {
                            this.exceptionHandler.handle(asyncResult.cause());
                        });
                        return;
                    } else {
                        logger.error("no exception handler for {}", asyncResult.cause().getMessage(), asyncResult.cause());
                        return;
                    }
                }
                if (this.slices.isEmpty()) {
                    end();
                } else {
                    this.context.runOnContext(r3 -> {
                        read();
                    });
                }
            });
        } else {
            handleFirstSliceData();
            this.context.runOnContext(r3 -> {
                read();
            });
        }
    }

    private void handleFirstSliceData() {
        JsonObject jsonObject = (JsonObject) this.slices.removeFirst();
        if (jsonObject == null) {
            end();
            return;
        }
        if (Boolean.FALSE.equals(jsonObject.getBoolean("deleted", false))) {
            jsonObject.remove("deleted");
            Buffer buffer = Buffer.buffer(jsonObject.encode());
            try {
                this.context.runOnContext(r5 -> {
                    this.handler.handle(buffer);
                });
            } catch (Exception e) {
                logger.error("IllegalStateException received {}: setting handler to null (request canceled?)", e.getMessage());
                end();
            }
        }
    }

    private Boolean populateSlices() throws InterruptedException, ExecutionException, TimeoutException {
        if (!this.tailer.isClosed() && !this.tailer.isClosing()) {
            return (Boolean) onCqThread(() -> {
                try {
                    try {
                        readDocuments();
                        this.currentIndex.set(this.tailer.index());
                        return Boolean.valueOf(!this.slices.isEmpty());
                    } catch (Throwable th) {
                        logger.error("unknown error", th);
                        throw th;
                    }
                } catch (Throwable th2) {
                    this.currentIndex.set(this.tailer.index());
                    throw th2;
                }
            }).get(5L, TimeUnit.SECONDS);
        }
        logger.error("Refused to fetch: tailer {} is closing", this.tailer);
        throw new ServerFault("tailer is closing or closed");
    }

    private void readDocuments() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        while (atomicInteger.intValue() < this.slotSliceSize && this.tailer.readDocument(wireIn -> {
            wireIn.read("sdssync").marshallable(wireIn -> {
                String text = wireIn.read("type").text();
                JsonObject jsonObject = new JsonObject();
                jsonObject.put("type", text);
                if (SdsSyncEvent.FHADD.name().equals(text)) {
                    jsonObject.put("key", wireIn.read("key").text());
                } else {
                    byte[] bytes = wireIn.read("key").bytes();
                    if (text.equals(SdsSyncEvent.BODYADD.name())) {
                        jsonObject.put("deleted", Boolean.valueOf(this.dbLookup.exists(bytes)));
                    }
                    jsonObject.put("key", ByteBufUtil.hexDump(bytes));
                    jsonObject.put("srv", wireIn.read("srv").text());
                }
                jsonObject.put("index", Long.valueOf(this.tailer.index()));
                this.slices.add(jsonObject);
                atomicInteger.incrementAndGet();
            });
        })) {
        }
    }

    /* renamed from: resume, reason: merged with bridge method [inline-methods] */
    public CQSdsSyncReadStream m9resume() {
        this.paused.set(false);
        read();
        return this;
    }

    /* renamed from: fetch, reason: merged with bridge method [inline-methods] */
    public CQSdsSyncReadStream m12fetch(long j) {
        return this;
    }

    public CQSdsSyncReadStream endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    private void close() {
        m6pause();
        try {
            this.q.close();
        } catch (Exception e) {
            logger.error("error while closing", e);
            if (this.exceptionHandler != null) {
                this.context.runOnContext(r5 -> {
                    this.exceptionHandler.handle(e);
                });
            }
        }
        if (this.dbLookup != null) {
            try {
                this.dbLookup.close();
            } catch (Exception unused) {
            }
        }
    }

    /* renamed from: handler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m7handler(Handler handler) {
        return handler((Handler<Buffer>) handler);
    }

    /* renamed from: endHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m8endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m10exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m11exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
