package net.bluemind.retry.support;

import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.ThreadingModel;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import net.bluemind.lib.vertx.VertxContext;
import net.bluemind.lib.vertx.VertxPlatform;
import net.bluemind.metrics.registry.IdFactory;
import net.bluemind.metrics.registry.MetricsRegistry;
import net.bluemind.retry.support.common.RetryQueue;
import net.bluemind.retry.support.keydb.KeydbQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/retry/support/RetryQueueVerticle.class */
public class RetryQueueVerticle extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger(RetryQueueVerticle.class);
    private final String topic;
    private final RetryProcessor rp;
    private final RetryQueue persistentQueue;
    private final Counter writeCounter;
    private final RateLimiter limitFlushes;
    private MessageConsumer<JsonObject> cons;
    private MessageConsumer<?> compactCons;
    private Future<String> depFuture;
    private static final String TOPIC_TAG = "topic";
    private static final String RETRY_ID = "retry";

    /* loaded from: input_file:net/bluemind/retry/support/RetryQueueVerticle$FlushCompanionVerticle.class */
    private static class FlushCompanionVerticle extends AbstractVerticle {
        private static final Logger logger = LoggerFactory.getLogger(FlushCompanionVerticle.class);
        private final RetryQueue persistentQueue;
        private final Timer flushesTimer;
        private final Counter flushedItemsCounter;
        private final String topic;
        private final RetryProcessor rp;
        private MessageConsumer<?> cons;
        private long periodic;
        private final AtomicBoolean flushing = new AtomicBoolean();

        public FlushCompanionVerticle(RetryQueue retryQueue, RetryProcessor retryProcessor, String str) {
            this.persistentQueue = retryQueue;
            this.rp = retryProcessor;
            Registry registry = MetricsRegistry.get();
            IdFactory idFactory = new IdFactory(RetryQueueVerticle.RETRY_ID, registry, RetryRequester.class);
            this.flushesTimer = registry.timer(idFactory.name("flushes", new String[]{RetryQueueVerticle.TOPIC_TAG, str}));
            this.flushedItemsCounter = registry.counter(idFactory.name("flushedItems", new String[]{RetryQueueVerticle.TOPIC_TAG, str}));
            this.topic = str;
        }

        public void start() throws Exception {
            this.cons = this.vertx.eventBus().consumer("retry.flush." + this.topic, message -> {
                try {
                    flushRetries();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            });
            this.periodic = VertxPlatform.executeBlockingPeriodic(this.vertx, 60000L, l -> {
                this.persistentQueue.compact();
            });
        }

        public void stop() throws Exception {
            this.vertx.cancelTimer(this.periodic);
            this.cons.unregister();
            super.stop();
        }

        private void flushRetries() {
            if (!this.flushing.compareAndSet(false, true)) {
                logger.warn("Flush already in progress");
                return;
            }
            RetryQueue.Tailer reader = this.persistentQueue.reader();
            Stopwatch createStarted = Stopwatch.createStarted();
            CompletableFuture<Long> completableFuture = new CompletableFuture<>();
            untilStopped(completableFuture, VertxContext.getOrCreateDuplicatedContext(), reader, 0L);
            completableFuture.whenComplete((l, th) -> {
                this.flushesTimer.record(createStarted.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                if (th != null) {
                    this.flushedItemsCounter.increment(l.longValue());
                }
                this.flushing.compareAndSet(true, false);
            });
        }

        private void untilStopped(CompletableFuture<Long> completableFuture, Context context, RetryQueue.Tailer tailer, long j) {
            RetryQueue.QueueRecord next = tailer.next();
            if (next == null) {
                context.runOnContext(r7 -> {
                    completableFuture.complete(Long.valueOf(j));
                });
                return;
            }
            CompletableFuture completableFuture2 = new CompletableFuture();
            Thread.ofVirtual().name("flush:" + this.topic).start(() -> {
                try {
                    this.rp.retry(new JsonObject(next.payload()));
                    context.runOnContext(r4 -> {
                        completableFuture2.complete(null);
                    });
                } catch (Exception e) {
                    context.runOnContext(r5 -> {
                        completableFuture2.completeExceptionally(e);
                    });
                }
            });
            completableFuture2.whenComplete((r15, th) -> {
                if (th != null) {
                    completableFuture.complete(Long.valueOf(j));
                } else {
                    tailer.commit();
                    untilStopped(completableFuture, context, tailer, j + 1);
                }
            });
        }
    }

    /* loaded from: input_file:net/bluemind/retry/support/RetryQueueVerticle$RetryProcessor.class */
    public interface RetryProcessor {
        void retry(JsonObject jsonObject) throws Exception;
    }

    protected RetryQueueVerticle(String str, RetryProcessor retryProcessor) {
        this.topic = str;
        this.rp = retryProcessor;
        this.persistentQueue = new KeydbQueue(str);
        Registry registry = MetricsRegistry.get();
        this.writeCounter = registry.counter(new IdFactory(RETRY_ID, registry, RetryRequester.class).name("writes", new String[]{TOPIC_TAG, str}));
        this.limitFlushes = RateLimiter.create(1.0d);
    }

    public String topic() {
        return this.topic;
    }

    public String address() {
        return "retry." + this.topic;
    }

    public void start() throws Exception {
        this.depFuture = this.vertx.deployVerticle(() -> {
            return new FlushCompanionVerticle(this.persistentQueue, this.rp, this.topic);
        }, new DeploymentOptions().setInstances(1).setThreadingModel(ThreadingModel.WORKER));
        EventBus eventBus = this.vertx.eventBus();
        String str = "retry.flush." + this.topic;
        AtomicLong atomicLong = new AtomicLong();
        this.cons = eventBus.consumer("retry." + this.topic, message -> {
            this.persistentQueue.writer().write(((JsonObject) message.body()).encode());
            this.writeCounter.increment();
            logger.debug("WRITE {}", this.topic);
            this.vertx.cancelTimer(atomicLong.get());
            if (this.limitFlushes.tryAcquire()) {
                eventBus.send(str, "F");
            } else {
                atomicLong.set(this.vertx.setTimer(250L, l -> {
                    eventBus.send(str, "F");
                }));
            }
            message.reply(0L);
        });
        this.compactCons = eventBus.consumer("retry.compact." + this.topic, message2 -> {
            this.persistentQueue.compact();
            message2.reply(0L);
        });
    }

    public void stop() throws Exception {
        this.cons.unregister();
        this.compactCons.unregister();
        if (this.depFuture.succeeded()) {
            this.vertx.undeploy((String) this.depFuture.result());
        }
        super.stop();
    }
}
