package net.bluemind.central.reverse.proxy.model.common.kafka.impl;

import com.google.common.util.concurrent.RateLimiter;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import net.bluemind.central.reverse.proxy.model.common.kafka.KafkaConsumerClient;
import net.bluemind.central.reverse.proxy.model.common.kafka.LagConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/central/reverse/proxy/model/common/kafka/impl/KafkaConsumerClientImpl.class */
public class KafkaConsumerClientImpl<K, V> implements KafkaConsumerClient<K, V> {
    private static final int MIN_UPTIME_BEFORE_EMPTY_CONSUMPTION = 10;
    private static final int POLL_DURATION_IN_SECONDS = 1;
    private long startTimeInMillis;
    private KafkaConsumer<K, V> consumer;
    private Handler<ConsumerRecord<K, V>> handler;
    private Handler<ConsumerRecords<K, V>> batchHandler;
    private LagConsumer lagConsumer;
    private RateLimiter lagCheckLimiter;
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumerClientImpl.class);
    private boolean hadRecords = false;
    private boolean infinite = false;

    public KafkaConsumerClientImpl(Properties properties) {
        this.consumer = new KafkaConsumer<>(properties);
    }

    @Override // net.bluemind.central.reverse.proxy.model.common.kafka.KafkaConsumerClient
    public KafkaConsumerClientImpl<K, V> handler(Handler<ConsumerRecord<K, V>> handler) {
        this.handler = handler;
        return this;
    }

    @Override // net.bluemind.central.reverse.proxy.model.common.kafka.KafkaConsumerClient
    public KafkaConsumerClient<K, V> lagConsumer(LagConsumer lagConsumer) {
        this.lagCheckLimiter = RateLimiter.create(0.1d);
        this.lagConsumer = lagConsumer;
        return this;
    }

    @Override // net.bluemind.central.reverse.proxy.model.common.kafka.KafkaConsumerClient
    public KafkaConsumerClientImpl<K, V> batchHandler(Handler<ConsumerRecords<K, V>> handler) {
        this.batchHandler = handler;
        return this;
    }

    @Override // net.bluemind.central.reverse.proxy.model.common.kafka.KafkaConsumerClient
    public KafkaConsumerClient<K, V> infinite(boolean z) {
        this.infinite = z;
        return this;
    }

    @Override // net.bluemind.central.reverse.proxy.model.common.kafka.KafkaConsumerClient
    public Future<Void> subscribe(List<String> list) {
        this.consumer.subscribe(list);
        this.startTimeInMillis = System.currentTimeMillis();
        return consume();
    }

    private Future<Void> consume() {
        Promise<Void> promise = Promise.promise();
        consume(promise);
        return promise.future();
    }

    private void consume(Promise<Void> promise) {
        Thread.ofPlatform().name("kafka-consumer").start(() -> {
            while (true) {
                boolean handle = handle(this.consumer.poll(Duration.ofSeconds(1L)));
                if (!this.infinite && isEmptyConsumption(handle, promise)) {
                    promise.complete();
                } else if (handle) {
                    this.hadRecords = true;
                }
                if (this.lagConsumer != null && this.lagCheckLimiter.tryAcquire()) {
                    this.lagConsumer.checkLag(this.consumer);
                }
            }
        });
    }

    private boolean isEmptyConsumption(boolean z, Promise<Void> promise) {
        if (z || promise.future().isComplete()) {
            return false;
        }
        return this.hadRecords || upTimeInSeconds() > 10;
    }

    private boolean handle(ConsumerRecords<K, V> consumerRecords) {
        if (consumerRecords == null || consumerRecords.count() == 0) {
            return false;
        }
        this.logger.debug("consuming {} records", Integer.valueOf(consumerRecords.count()));
        if (!Objects.isNull(this.batchHandler) || Objects.isNull(this.handler)) {
            if (Objects.isNull(this.batchHandler)) {
                return true;
            }
            this.batchHandler.handle(consumerRecords);
            return true;
        }
        Handler<ConsumerRecord<K, V>> handler = this.handler;
        handler.getClass();
        consumerRecords.forEach((v1) -> {
            r1.handle(v1);
        });
        return true;
    }

    private long upTimeInSeconds() {
        return (System.currentTimeMillis() - this.startTimeInMillis) / 1000;
    }
}
