package net.bluemind.core.backup.store.kafka;

import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Registry;
import com.typesafe.config.Config;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.vertx.core.json.JsonObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.bluemind.config.InstallationId;
import net.bluemind.core.backup.continuous.IRecordStarvationStrategy;
import net.bluemind.core.backup.continuous.RecordStarvationStrategies;
import net.bluemind.core.backup.continuous.store.ITopicStore;
import net.bluemind.core.backup.continuous.store.RecordHandler;
import net.bluemind.core.backup.continuous.store.TopicSubscriber;
import net.bluemind.core.backup.store.kafka.config.KafkaStoreConfig;
import net.bluemind.core.backup.store.kafka.metrics.KafkaMetric;
import net.bluemind.core.backup.store.kafka.metrics.KafkaTopicMetrics;
import net.bluemind.lib.vertx.VertxPlatform;
import net.bluemind.metrics.registry.IdFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/backup/store/kafka/KafkaTopicSubscriber.class */
public class KafkaTopicSubscriber implements TopicSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicSubscriber.class);
    private static final AtomicInteger CONS_ID_ALLOCATOR = new AtomicInteger();
    private final String bootstrapServer;
    private final List<String> topicNames;
    private final Registry reg;
    private final IdFactory idFactory;
    private RateLimiter lagReportRateLimiter = RateLimiter.create(0.5d);

    /* loaded from: input_file:net/bluemind/core/backup/store/kafka/KafkaTopicSubscriber$SetCurrentOffsetsOnPartitionAssigned.class */
    public static class SetCurrentOffsetsOnPartitionAssigned implements ConsumerRebalanceListener {
        private Consumer<?, ?> consumer;
        private IRecordStarvationStrategy strat;

        public SetCurrentOffsetsOnPartitionAssigned(Consumer<?, ?> consumer, IRecordStarvationStrategy iRecordStarvationStrategy) {
            this.consumer = consumer;
            this.strat = iRecordStarvationStrategy;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            this.strat.updateOffsets((Map) this.consumer.committed(new HashSet(collection)).entrySet().stream().collect(Collectors.toMap(entry -> {
                return ((TopicPartition) entry.getKey()).topic() + "-" + ((TopicPartition) entry.getKey()).partition();
            }, entry2 -> {
                return Long.valueOf(entry2.getValue() != null ? ((OffsetAndMetadata) entry2.getValue()).offset() : 0L);
            })));
        }
    }

    public KafkaTopicSubscriber(String str, List<String> list, Registry registry, IdFactory idFactory) {
        this.bootstrapServer = str;
        this.topicNames = list;
        this.reg = registry;
        this.idFactory = idFactory;
    }

    public String topicName() {
        return (String) this.topicNames.getFirst();
    }

    public ITopicStore.IResumeToken subscribe(RecordHandler recordHandler) {
        return subscribe(null, recordHandler);
    }

    public ITopicStore.IResumeToken subscribe(ITopicStore.IResumeToken iResumeToken, RecordHandler recordHandler) {
        return subscribe(iResumeToken, recordHandler, RecordStarvationStrategies.EARLY_ABORT);
    }

    public ITopicStore.IResumeToken subscribe(ITopicStore.IResumeToken iResumeToken, RecordHandler recordHandler, IRecordStarvationStrategy iRecordStarvationStrategy) {
        KafkaToken kafkaToken = (KafkaToken) iResumeToken;
        if (kafkaToken == null) {
            kafkaToken = new KafkaToken("clone-" + UUID.randomUUID().toString().replace("-", "") + "-of-" + InstallationId.getIdentifier().replace("bluemind-", ""), Integer.parseInt(System.getProperty("backup.continuous.clone.workers", Math.max(4, Runtime.getRuntime().availableProcessors() - 2))));
        }
        iRecordStarvationStrategy.checkpoint(topicName(), kafkaToken);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(kafkaToken.workers, new DefaultThreadFactory("clone-p-" + topicName()));
        CompletableFuture[] completableFutureArr = new CompletableFuture[kafkaToken.workers];
        String str = kafkaToken.groupId;
        ParallelStarvationHandler parallelStarvationHandler = new ParallelStarvationHandler(iRecordStarvationStrategy, kafkaToken.workers, getEndOffsets(str));
        int incrementAndGet = CONS_ID_ALLOCATOR.incrementAndGet();
        for (int i = 0; i < kafkaToken.workers; i++) {
            String str2 = "cons-" + incrementAndGet + "-client-" + i;
            completableFutureArr[i] = CompletableFuture.supplyAsync(() -> {
                logger.info("Starting {} for topic {}", str2, topicName());
                return Long.valueOf(consumeLoop(recordHandler, parallelStarvationHandler, str, str2));
            }, newFixedThreadPool);
        }
        CompletableFuture.allOf(completableFutureArr).join();
        newFixedThreadPool.shutdown();
        logger.info("[{}] ending subscribe loop", topicName());
        return kafkaToken;
    }

    private Map<String, Long> getEndOffsets(String str) {
        Throwable th = null;
        try {
            KafkaConsumer<byte[], byte[]> createKafkaConsumer = createKafkaConsumer(str, "cons-" + CONS_ID_ALLOCATOR.incrementAndGet() + "-getendoffset");
            try {
                ArrayList arrayList = new ArrayList();
                for (String str2 : this.topicNames) {
                    arrayList.addAll(createKafkaConsumer.partitionsFor(str2).stream().map(partitionInfo -> {
                        return new TopicPartition(str2, partitionInfo.partition());
                    }).toList());
                }
                createKafkaConsumer.assign(arrayList);
                do {
                    createKafkaConsumer.poll(Duration.ofMillis(100L));
                } while (createKafkaConsumer.assignment().isEmpty());
                Map<String, Long> map = (Map) createKafkaConsumer.endOffsets(arrayList).entrySet().stream().collect(Collectors.toMap(entry -> {
                    return ((TopicPartition) entry.getKey()).topic() + "-" + ((TopicPartition) entry.getKey()).partition();
                }, (v0) -> {
                    return v0.getValue();
                }));
                if (createKafkaConsumer != null) {
                    createKafkaConsumer.close();
                }
                return map;
            } catch (Throwable th2) {
                if (createKafkaConsumer != null) {
                    createKafkaConsumer.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                th = th3;
            } else if (null != th3) {
                th.addSuppressed(th3);
            }
            throw th;
        }
    }

    private long consumeLoop(RecordHandler recordHandler, IRecordStarvationStrategy iRecordStarvationStrategy, String str, String str2) {
        AtomicLong atomicLong = new AtomicLong();
        Throwable th = null;
        try {
            KafkaConsumer<byte[], byte[]> createKafkaConsumer = createKafkaConsumer(str, str2);
            try {
                createKafkaConsumer.subscribe(this.topicNames, new SetCurrentOffsetsOnPartitionAssigned(createKafkaConsumer, iRecordStarvationStrategy));
                JsonObject put = new JsonObject().put("topic", topicName());
                while (true) {
                    ConsumerRecords<byte[], byte[]> poll = createKafkaConsumer.poll(Duration.ofMillis(500L));
                    iRecordStarvationStrategy.onRecordsReceived(put.put("records", Integer.valueOf(poll.count())));
                    Stream stream = poll.partitions().stream();
                    Function function = topicPartition -> {
                        return topicPartition.topic() + "-" + topicPartition.partition();
                    };
                    createKafkaConsumer.getClass();
                    iRecordStarvationStrategy.updateOffsets((Map) stream.collect(Collectors.toMap(function, createKafkaConsumer::position)));
                    if (this.lagReportRateLimiter.tryAcquire()) {
                        reportLag(str, str2, createKafkaConsumer);
                    }
                    if (!poll.isEmpty()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("{}: {} record(s)", topicName(), Integer.valueOf(poll.count()));
                        }
                        processRecords(recordHandler, createKafkaConsumer, atomicLong, poll);
                    }
                    if (poll.isEmpty() || iRecordStarvationStrategy.isTopicFinished()) {
                        if (!createKafkaConsumer.assignment().isEmpty()) {
                            IRecordStarvationStrategy.ExpectedBehaviour expectedBehaviour = IRecordStarvationStrategy.ExpectedBehaviour.RETRY;
                            if (iRecordStarvationStrategy.onStarvation(new JsonObject().put("topic", topicName()).put("cid", str2).put("records", Long.valueOf(atomicLong.get()))) == IRecordStarvationStrategy.ExpectedBehaviour.ABORT) {
                                break;
                            }
                        }
                    }
                    createKafkaConsumer.commitAsync();
                }
                createKafkaConsumer.commitSync();
                reportLag(str, str2, createKafkaConsumer);
                if (createKafkaConsumer != null) {
                    createKafkaConsumer.close();
                }
                return atomicLong.longValue();
            } catch (Throwable th2) {
                if (createKafkaConsumer != null) {
                    createKafkaConsumer.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                th = th3;
            } else if (null != th3) {
                th.addSuppressed(th3);
            }
            throw th;
        }
    }

    private void processRecords(RecordHandler recordHandler, KafkaConsumer<byte[], byte[]> kafkaConsumer, AtomicLong atomicLong, ConsumerRecords<byte[], byte[]> consumerRecords) {
        long j = 0;
        long duration = KafkaStoreConfig.get().getDuration("kafka.consumer.maxPollInterval", TimeUnit.MILLISECONDS) / consumerRecords.count();
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            for (ConsumerRecord consumerRecord : consumerRecords.records(topicPartition)) {
                try {
                    Stopwatch createStarted = Stopwatch.createStarted();
                    recordHandler.accept((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());
                    long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
                    if (elapsed > duration && logger.isWarnEnabled()) {
                        logger.warn("[part {} - offset {}] processing of {} was slow {}ms (budget is {}ms)", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), new String((byte[]) consumerRecord.key()), Long.valueOf(elapsed), Long.valueOf(duration)});
                    }
                    j = consumerRecord.offset();
                    atomicLong.incrementAndGet();
                } catch (Exception e) {
                    KafkaStoreConfig.PoisonPillStrategy poisonPillStrategy = (KafkaStoreConfig.PoisonPillStrategy) KafkaStoreConfig.get().getEnum(KafkaStoreConfig.PoisonPillStrategy.class, "kafka.consumer.poisonPillStrategy");
                    logger.error("[part {} - offset {}] handler {} failed, strategy is {}", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), recordHandler, poisonPillStrategy, e});
                    poisonPillStrategy.apply((byte[]) consumerRecord.value(), e);
                    kafkaConsumer.commitSync(Map.of(topicPartition, new OffsetAndMetadata(j, "handler has failed to process: " + e.getMessage())));
                    throw e;
                }
            }
        }
    }

    private void reportLag(String str, String str2, KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        String str3 = str + "-" + str2;
        Gauge gauge = this.reg.gauge(this.idFactory.name(KafkaTopicMetrics.LAG, new String[]{"groupAndClient", str3}));
        LongAdder longAdder = new LongAdder();
        kafkaConsumer.assignment().forEach(topicPartition -> {
            kafkaConsumer.currentLag(topicPartition).ifPresent(j -> {
                if (j > 0 && logger.isDebugEnabled()) {
                    logger.debug("**** LAG part {} => {}", Integer.valueOf(topicPartition.partition()), Long.valueOf(j));
                }
                longAdder.add(j);
            });
        });
        gauge.set(longAdder.doubleValue());
        VertxPlatform.eventBus().publish("bm.monitoring.fw.kafka.metrics", new KafkaMetric(str3, KafkaTopicMetrics.LAG, longAdder.sum(), KafkaTopicMetrics.ClientEnum.CONSUMER.name()).toJsonObj());
    }

    public ITopicStore.IResumeToken parseToken(JsonObject jsonObject) {
        return new KafkaToken(jsonObject.getString("group"), jsonObject.getInteger("workers", 4).intValue());
    }

    private KafkaConsumer<byte[], byte[]> createKafkaConsumer(String str, String str2) {
        if (logger.isDebugEnabled()) {
            logger.debug("bootstrap: {}, clientId: {}, inst: {}", new Object[]{this.bootstrapServer, str2, InstallationId.getIdentifier()});
        }
        Properties properties = new Properties();
        Config config = KafkaStoreConfig.get();
        properties.setProperty("bootstrap.servers", this.bootstrapServer);
        properties.setProperty("group.id", str);
        properties.setProperty("client.id", str2);
        properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getCanonicalName());
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("fetch.max.wait.ms", Long.toString(config.getDuration("kafka.consumer.fetchMaxWait", TimeUnit.MILLISECONDS)));
        properties.setProperty("max.poll.interval.ms", Long.toString(config.getDuration("kafka.consumer.maxPollInterval", TimeUnit.MILLISECONDS)));
        if (config.hasPath("kafka.consumer.clientRack")) {
            properties.setProperty("client.rack", config.getString("kafka.consumer.clientRack"));
        }
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(null);
        try {
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaConsumer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper("KafkaTopicSubscriber").add("names", this.topicNames).toString();
    }
}
