package net.bluemind.central.reverse.proxy.model.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.typesafe.config.Config;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.JsonObject;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import net.bluemind.central.reverse.proxy.common.ProxyEventBusAddress;
import net.bluemind.central.reverse.proxy.model.PostfixMapsStore;
import net.bluemind.central.reverse.proxy.model.ProxyInfoStore;
import net.bluemind.central.reverse.proxy.model.RecordHandler;
import net.bluemind.central.reverse.proxy.model.client.PostfixMapsStoreClient;
import net.bluemind.central.reverse.proxy.model.client.ProxyInfoStoreClient;
import net.bluemind.central.reverse.proxy.model.common.kafka.InstallationTopics;
import net.bluemind.central.reverse.proxy.model.common.kafka.KafkaConsumerClient;
import net.bluemind.central.reverse.proxy.model.common.kafka.LagConsumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle.class */
public class ProxyInfoVerticle extends AbstractVerticle {
    private final Config config;
    private final String bootstrapServers;
    private RecordHandler<byte[], byte[]> recordHandler;
    private ProxyInfoStore proxyInfoStore;
    private PostfixMapsStore postfixMapsStore;
    private final Supplier<PostfixMapsStore> postfixMapsStoreSupplier;
    private final Supplier<ProxyInfoStore> proxyInfoStoreSupplier;
    private final Logger logger = LoggerFactory.getLogger(ProxyInfoVerticle.class);
    private List<KafkaConsumerClient<byte[], byte[]>> kafkaConsumers = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag.class */
    public static final class PartLag extends Record {
        private final String topic;
        private final int partition;
        private final long lag;

        private PartLag(String str, int i, long j) {
            this.topic = str;
            this.partition = i;
            this.lag = j;
        }

        String key() {
            return this.topic + "#" + this.partition;
        }

        public String topic() {
            return this.topic;
        }

        public int partition() {
            return this.partition;
        }

        public long lag() {
            return this.lag;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, PartLag.class), PartLag.class, "topic;partition;lag", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->topic:Ljava/lang/String;", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->partition:I", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->lag:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, PartLag.class), PartLag.class, "topic;partition;lag", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->topic:Ljava/lang/String;", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->partition:I", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->lag:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, PartLag.class, Object.class), PartLag.class, "topic;partition;lag", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->topic:Ljava/lang/String;", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->partition:I", "FIELD:Lnet/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PartLag;->lag:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        /* synthetic */ PartLag(String str, int i, long j, PartLag partLag) {
            this(str, i, j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/central/reverse/proxy/model/impl/ProxyInfoVerticle$PublishingLagConsumer.class */
    public static class PublishingLagConsumer implements LagConsumer {
        private final ConcurrentHashMap<String, Long> lags = new ConcurrentHashMap<>();
        private final RateLimiter reportLimit = RateLimiter.create(0.1d);
        private final MessageProducer<Long> pub;
        private Logger logger;

        public PublishingLagConsumer(Vertx vertx, Logger logger) {
            this.pub = vertx.eventBus().publisher("model.lag");
            this.logger = logger;
        }

        public void checkLag(KafkaConsumer<?, ?> kafkaConsumer) {
            long sum = kafkaConsumer.assignment().stream().map(topicPartition -> {
                return new PartLag(topicPartition.topic(), topicPartition.partition(), kafkaConsumer.currentLag(topicPartition).orElse(2147483647L));
            }).mapToLong(partLag -> {
                this.lags.put(partLag.key(), Long.valueOf(partLag.lag()));
                return partLag.lag();
            }).sum();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Check LAG ({}) on {}", Long.valueOf(sum), kafkaConsumer);
            }
            if (this.reportLimit.tryAcquire()) {
                this.pub.write(Long.valueOf(this.lags.reduceValuesToLong(64L, (v0) -> {
                    return v0.longValue();
                }, 0L, (j, j2) -> {
                    return j + j2;
                })));
            }
        }
    }

    public ProxyInfoVerticle(Config config, Supplier<ProxyInfoStore> supplier, Supplier<PostfixMapsStore> supplier2) {
        this.config = config;
        this.bootstrapServers = config.getString("bm.kafka.bootstrap.servers");
        this.proxyInfoStoreSupplier = supplier;
        this.postfixMapsStoreSupplier = supplier2;
    }

    public void start(Promise<Void> promise) {
        this.recordHandler = RecordHandler.createByteHandler(ProxyInfoStoreClient.create(this.vertx), PostfixMapsStoreClient.create(this.vertx), this.vertx);
        this.logger.info("[model] Starting");
        this.vertx.eventBus().consumer("proxy-address").handler(message -> {
            if ("stream-ready".equals(message.headers().get("action"))) {
                this.logger.info("[model] Dir entries stream ready, starting model");
                this.proxyInfoStore = this.proxyInfoStoreSupplier.get().setupService(this.vertx);
                this.postfixMapsStore = this.postfixMapsStoreSupplier.get().setupService(this.vertx);
                startKafkaConsumption((InstallationTopics) ((JsonObject) message.body()).mapTo(InstallationTopics.class)).onSuccess(r4 -> {
                    this.logger.info("[model] Started");
                }).onFailure(th -> {
                    this.logger.error("[model] Failed to start model", th);
                });
            }
        });
        promise.complete();
    }

    private Future<Void> startKafkaConsumption(InstallationTopics installationTopics) {
        return deployKafkaConsumer(installationTopics).map(this::publishTopics).mapEmpty();
    }

    private Future<InstallationTopics> deployKafkaConsumer(InstallationTopics installationTopics) {
        List<String> list = topicNamesToConsume(installationTopics);
        AtomicInteger atomicInteger = new AtomicInteger();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        PublishingLagConsumer publishingLagConsumer = new PublishingLagConsumer(this.vertx, this.logger);
        String str = this.config.getString("bm.crp.model.consumer-group-prefix") + "-" + String.valueOf(UUID.randomUUID());
        return this.vertx.deployVerticle(() -> {
            return new AbstractVerticle() { // from class: net.bluemind.central.reverse.proxy.model.impl.ProxyInfoVerticle.1
                public void start() throws Exception {
                    String str2 = ProxyInfoVerticle.this.config.getString("bm.crp.model.client-id-prefix") + "-" + atomicInteger.incrementAndGet();
                    concurrentHashMap.put(str2, Promise.promise());
                    KafkaConsumerClient<byte[], byte[]> createConsumer = ProxyInfoVerticle.this.createConsumer(str, str2);
                    ProxyInfoVerticle.this.kafkaConsumers.add(createConsumer);
                    Future subscribe = createConsumer.handler(ProxyInfoVerticle.this.recordHandler).lagConsumer(publishingLagConsumer).subscribe(list);
                    Map map = concurrentHashMap;
                    subscribe.onSuccess(r5 -> {
                        ((Promise) map.get(str2)).complete();
                    });
                }
            };
        }, new DeploymentOptions().setInstances(Math.min(Runtime.getRuntime().availableProcessors(), this.config.getInt("bm.crp.model.number-of-consumer")))).flatMap(str2 -> {
            return Future.all((List) concurrentHashMap.values().stream().map((v0) -> {
                return v0.future();
            }).collect(Collectors.toList())).map(compositeFuture -> {
                return installationTopics;
            });
        });
    }

    private InstallationTopics publishTopics(InstallationTopics installationTopics) {
        this.logger.info("[model] Announcing model ready");
        this.vertx.setTimer(5000L, l -> {
            this.vertx.eventBus().publish("proxy-address", JsonObject.mapFrom(installationTopics), ProxyEventBusAddress.MODEL_READY);
        });
        return installationTopics;
    }

    private KafkaConsumerClient<byte[], byte[]> createConsumer(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("group.id", str);
        properties.put("client.id", str2);
        properties.put("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "true");
        return KafkaConsumerClient.create(properties);
    }

    private List<String> topicNamesToConsume(InstallationTopics installationTopics) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(installationTopics.crpTopicName);
        arrayList.add(installationTopics.orphans);
        return arrayList;
    }

    @VisibleForTesting
    public void tearDown() throws InterruptedException, ExecutionException {
        if (this.proxyInfoStore != null) {
            this.proxyInfoStore.tearDown();
        }
        if (this.postfixMapsStore != null) {
            this.postfixMapsStore.tearDown();
        }
    }
}
