package net.bluemind.core.auditlogs.client.kafka;

import com.typesafe.config.Config;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import net.bluemind.core.auditlogs.AuditLogEntry;
import net.bluemind.core.auditlogs.IAuditLogClient;
import net.bluemind.core.auditlogs.client.kafka.config.AuditLogKafkaConfig;
import net.bluemind.core.backup.store.kafka.KafkaTopicStore;
import net.bluemind.core.backup.store.kafka.config.KafkaStoreConfig;
import net.bluemind.core.utils.JsonUtils;
import net.bluemind.lifecycle.helper.SoftReset;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/auditlogs/client/kafka/KafkaAuditLogClient.class */
public class KafkaAuditLogClient implements IAuditLogClient {
    private static final Logger logger = LoggerFactory.getLogger(KafkaAuditLogClient.class);
    private static final Map<String, KafkaProducer<byte[], byte[]>> perPhyTopicProd = initProdCache();
    private final KafkaAuditLogMngt manager;
    private final String bootstrap;

    /* loaded from: input_file:net/bluemind/core/auditlogs/client/kafka/KafkaAuditLogClient$AuditLogSerializer.class */
    private static class AuditLogSerializer {
        private final JsonUtils.ValueWriter keyWriter = JsonUtils.writer(AuditLogKey.class);
        private final JsonUtils.ValueWriter valueWriter = JsonUtils.writer(AuditLogEntry.class);

        public byte[] key(AuditLogKey auditLogKey) {
            return this.keyWriter.write(auditLogKey);
        }

        public byte[] value(AuditLogEntry auditLogEntry) {
            return this.valueWriter.write(auditLogEntry);
        }
    }

    private static Map<String, KafkaProducer<byte[], byte[]>> initProdCache() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        SoftReset.register(() -> {
            concurrentHashMap.values().removeIf(kafkaProducer -> {
                kafkaProducer.close();
                return true;
            });
        });
        return concurrentHashMap;
    }

    public KafkaAuditLogClient(String str, KafkaAuditLogMngt kafkaAuditLogMngt) {
        this.bootstrap = str;
        this.manager = kafkaAuditLogMngt;
    }

    public void storeAuditLog(AuditLogEntry auditLogEntry) {
        String topic = AuditLogKafkaConfig.getTopic(auditLogEntry.domainUid);
        AuditLogSerializer auditLogSerializer = new AuditLogSerializer();
        CompletableFuture completableFuture = new CompletableFuture();
        KafkaProducer<byte[], byte[]> computeIfAbsent = perPhyTopicProd.computeIfAbsent(topic, str -> {
            if (!this.manager.hasKafkaTopicForDomainUid(auditLogEntry.domainUid)) {
                this.manager.createKafkaTopic(topic);
            }
            return createKafkaProducer();
        });
        AuditLogKey generateKey = generateKey(auditLogEntry);
        int abs = Math.abs(generateKey.containerUid().hashCode() % KafkaTopicStore.PARTITION_COUNT);
        byte[] value = auditLogSerializer.value(auditLogEntry);
        computeIfAbsent.send(new ProducerRecord(topic, Integer.valueOf(abs), auditLogSerializer.key(generateKey), value), (recordMetadata, exc) -> {
            if (exc == null) {
                logger.debug("[{}] stored part: {}, meta: {}", new Object[]{topic, Integer.valueOf(abs), recordMetadata});
                completableFuture.complete(null);
                return;
            }
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[0] = Integer.valueOf(value == null ? 0 : value.length);
            objArr[1] = generateKey;
            objArr[2] = exc.getMessage();
            logger2.warn("Could not store {}byte(s) of data. Key: {}, ({})", objArr);
            completableFuture.completeExceptionally(exc);
        });
    }

    private KafkaProducer<byte[], byte[]> createKafkaProducer() {
        Config config = KafkaStoreConfig.get();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrap);
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.setProperty("max.in.flight.requests.per.connection", Integer.toString(1));
        properties.setProperty("enable.idempotence", "true");
        properties.setProperty("acks", config.getString("kafka.producer.acks"));
        properties.setProperty("linger.ms", Long.toString(config.getDuration("kafka.producer.linger", TimeUnit.MILLISECONDS)));
        properties.setProperty("buffer.memory", Long.toString(config.getMemorySize("kafka.producer.bufferMemory").toBytes()));
        properties.setProperty("batch.size", Long.toString(config.getMemorySize("kafka.producer.batchSize").toBytes()));
        properties.setProperty("max.request.size", Long.toString(config.getMemorySize("kafka.producer.maxRecordSize").toBytes()));
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(null);
        try {
            KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaProducer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private AuditLogKey generateKey(AuditLogEntry auditLogEntry) {
        return new AuditLogKey(auditLogEntry.container == null ? "__orphan__" : auditLogEntry.container.uid(), auditLogEntry.item == null ? auditLogEntry.domainUid : auditLogEntry.item.uid(), System.nanoTime());
    }
}
