package net.bluemind.core.backup.store.kafka;

import com.google.common.base.MoreObjects;
import com.google.common.util.concurrent.RateLimiter;
import com.typesafe.config.Config;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.bluemind.core.backup.continuous.store.TopicPublisher;
import net.bluemind.core.backup.store.kafka.config.KafkaStoreConfig;
import net.bluemind.core.backup.store.kafka.metrics.KafkaMetric;
import net.bluemind.core.backup.store.kafka.metrics.KafkaTopicMetrics;
import net.bluemind.lib.vertx.VertxPlatform;
import net.bluemind.lifecycle.helper.SoftReset;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/backup/store/kafka/KafkaTopicPublisher.class */
public class KafkaTopicPublisher implements TopicPublisher {
    private final String bootstrapServer;
    private final String physicalTopic;
    private final KafkaProducer<byte[], byte[]> producer;
    private final RateLimiter metricLimiter;
    private final Metric recordSendRate;
    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicPublisher.class);
    static final Map<String, KafkaProducer<byte[], byte[]>> perPhyTopicProd = perTopicProducers();
    private static final Map<String, RateLimiter> metricsLimiter = new ConcurrentHashMap();
    private static final Executor IN_ORDER = Executors.newSingleThreadExecutor();

    private static Map<String, KafkaProducer<byte[], byte[]>> perTopicProducers() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        SoftReset.register(() -> {
            perPhyTopicProd.values().removeIf(kafkaProducer -> {
                kafkaProducer.close();
                logger.info("{} closed.", kafkaProducer);
                return true;
            });
        });
        return concurrentHashMap;
    }

    public KafkaTopicPublisher(String str, String str2) {
        this.bootstrapServer = str;
        this.physicalTopic = str2;
        this.producer = perPhyTopicProd.computeIfAbsent(str2, str3 -> {
            return createKafkaProducer();
        });
        this.metricLimiter = metricsLimiter.computeIfAbsent(str2, str4 -> {
            return RateLimiter.create(1.0d);
        });
        this.recordSendRate = (Metric) this.producer.metrics().entrySet().stream().filter(entry -> {
            return KafkaTopicMetrics.SEND_RATE.equals(((MetricName) entry.getKey()).name()) && "producer-metrics".equals(((MetricName) entry.getKey()).group());
        }).map(entry2 -> {
            return (Metric) entry2.getValue();
        }).findFirst().orElseThrow();
    }

    public CompletableFuture<Void> store(String str, byte[] bArr, byte[] bArr2) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        int abs = Math.abs(str.hashCode() % KafkaTopicStore.PARTITION_COUNT);
        ProducerRecord producerRecord = new ProducerRecord(this.physicalTopic, Integer.valueOf(abs), bArr, bArr2);
        IN_ORDER.execute(() -> {
            try {
                logger.trace("submit {}", producerRecord);
                this.producer.send(producerRecord, (recordMetadata, exc) -> {
                    if (exc == null) {
                        logger.debug("[{}] stored part: {}, meta: {}", new Object[]{this.physicalTopic, Integer.valueOf(abs), recordMetadata});
                        completableFuture.complete(null);
                        return;
                    }
                    Logger logger2 = logger;
                    Object[] objArr = new Object[3];
                    objArr[0] = Integer.valueOf(bArr2 == null ? 0 : bArr2.length);
                    objArr[1] = new String(bArr);
                    objArr[2] = exc.getMessage();
                    logger2.error("Could not store {}byte(s) of data. Key: {}, ({}) CRITICAL FAILURE", objArr);
                    completableFuture.completeExceptionally(exc);
                });
            } catch (Exception e) {
                logger.error("Producer error", e);
                completableFuture.completeExceptionally(e);
            }
        });
        if (this.metricLimiter.tryAcquire()) {
            Object metricValue = this.recordSendRate.metricValue();
            if (metricValue instanceof Double) {
                VertxPlatform.eventBus().publish("bm.monitoring.fw.kafka.metrics", new KafkaMetric(this.physicalTopic, KafkaTopicMetrics.SEND_RATE, ((Double) metricValue).longValue(), KafkaTopicMetrics.ClientEnum.PRODUCER.name()).toJsonObj());
            }
        }
        return completableFuture;
    }

    private KafkaProducer<byte[], byte[]> createKafkaProducer() {
        Config config = KafkaStoreConfig.get();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrapServer);
        properties.setProperty("compression.type", "zstd");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.setProperty("max.in.flight.requests.per.connection", Integer.toString(1));
        properties.setProperty("enable.idempotence", "true");
        properties.setProperty("acks", config.getString("kafka.producer.acks"));
        properties.setProperty("linger.ms", Long.toString(config.getDuration("kafka.producer.linger", TimeUnit.MILLISECONDS)));
        properties.setProperty("buffer.memory", Long.toString(config.getMemorySize("kafka.producer.bufferMemory").toBytes()));
        properties.setProperty("batch.size", Long.toString(config.getMemorySize("kafka.producer.batchSize").toBytes()));
        properties.setProperty("max.request.size", Long.toString(config.getMemorySize("kafka.producer.maxRecordSize").toBytes()));
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(null);
        try {
            KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaProducer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper("KafkaTopic").add("name", this.physicalTopic).add("prod", this.producer).toString();
    }
}
