package net.bluemind.core.backup.store.kafka;

import com.netflix.spectator.api.Registry;
import com.typesafe.config.Config;
import java.io.File;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.bluemind.config.DataLocation;
import net.bluemind.config.InstallationId;
import net.bluemind.core.api.fault.ServerFault;
import net.bluemind.core.backup.continuous.model.TopicDescriptor;
import net.bluemind.core.backup.continuous.store.ITopicStore;
import net.bluemind.core.backup.continuous.store.TopicManager;
import net.bluemind.core.backup.continuous.store.TopicPublisher;
import net.bluemind.core.backup.continuous.store.TopicSubscriber;
import net.bluemind.core.backup.store.kafka.config.KafkaStoreConfig;
import net.bluemind.lifecycle.helper.SoftReset;
import net.bluemind.metrics.registry.IdFactory;
import net.bluemind.metrics.registry.MetricsRegistry;
import org.apache.curator.shaded.com.google.common.collect.Streams;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/backup/store/kafka/KafkaTopicStore.class */
public class KafkaTopicStore implements ITopicStore, TopicManager {
    static final String COMPRESSION_TYPE = "zstd";
    private final Supplier<AdminClient> adminClient;
    private final Registry reg;
    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicStore.class);
    private static final AtomicInteger cidAlloc = new AtomicInteger();
    public static final int PARTITION_COUNT = KafkaStoreConfig.get().getInt("kafka.topic.partitionCount");
    static final short REPL_FACTOR = (short) KafkaStoreConfig.get().getInt("kafka.topic.replicationFactor");
    private final Map<TopicDescriptor, KafkaTopicPublisher> knownPublisher = new ConcurrentHashMap();
    private final Bootstrap bootstrap = kafkaBootstrapServers();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/core/backup/store/kafka/KafkaTopicStore$Bootstrap.class */
    public static final class Bootstrap extends Record {
        private final String zookeeper;
        private final String brokers;

        private Bootstrap(String str, String str2) {
            this.zookeeper = str;
            this.brokers = str2;
        }

        public boolean valid() {
            return (this.zookeeper == null || this.brokers == null) ? false : true;
        }

        public String zookeeper() {
            return this.zookeeper;
        }

        public String brokers() {
            return this.brokers;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Bootstrap.class), Bootstrap.class, "zookeeper;brokers", "FIELD:Lnet/bluemind/core/backup/store/kafka/KafkaTopicStore$Bootstrap;->zookeeper:Ljava/lang/String;", "FIELD:Lnet/bluemind/core/backup/store/kafka/KafkaTopicStore$Bootstrap;->brokers:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Bootstrap.class), Bootstrap.class, "zookeeper;brokers", "FIELD:Lnet/bluemind/core/backup/store/kafka/KafkaTopicStore$Bootstrap;->zookeeper:Ljava/lang/String;", "FIELD:Lnet/bluemind/core/backup/store/kafka/KafkaTopicStore$Bootstrap;->brokers:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Bootstrap.class, Object.class), Bootstrap.class, "zookeeper;brokers", "FIELD:Lnet/bluemind/core/backup/store/kafka/KafkaTopicStore$Bootstrap;->zookeeper:Ljava/lang/String;", "FIELD:Lnet/bluemind/core/backup/store/kafka/KafkaTopicStore$Bootstrap;->brokers:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    public KafkaTopicStore() {
        String current = DataLocation.current();
        logger.warn("kafka.bootstrap {}, zk {}", this.bootstrap.brokers(), this.bootstrap.zookeeper());
        if (this.bootstrap.valid()) {
            this.adminClient = () -> {
                Properties properties = new Properties();
                properties.put("bootstrap.servers", this.bootstrap.brokers());
                properties.put("client.id", jvm() + "_" + InstallationId.getIdentifier() + "_" + current + "_" + cidAlloc.incrementAndGet());
                return AdminClient.create(properties);
            };
        } else {
            this.adminClient = null;
        }
        this.reg = MetricsRegistry.get();
        SoftReset.register(this::flushAll);
    }

    private String jvm() {
        return System.getProperty("net.bluemind.property.product", "unknown");
    }

    private Bootstrap kafkaBootstrapServers() {
        InputStream newInputStream;
        String property = System.getProperty("bm.kafka.bootstrap.servers");
        String property2 = System.getProperty("bm.zk.servers");
        if (property == null || property2 == null) {
            File file = new File("/etc/bm/kafka.properties");
            if (!file.exists()) {
                file = new File(System.getProperty("user.home") + "/kafka.properties");
            }
            if (file.exists()) {
                Properties properties = new Properties();
                Throwable th = null;
                try {
                    try {
                        newInputStream = Files.newInputStream(file.toPath(), new OpenOption[0]);
                    } catch (Throwable th2) {
                        if (0 == 0) {
                            th = th2;
                        } else if (null != th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    logger.warn(e.getMessage());
                }
                try {
                    properties.load(newInputStream);
                    if (newInputStream != null) {
                        newInputStream.close();
                    }
                    property = properties.getProperty("bootstrap.servers");
                    property2 = properties.getProperty("zookeeper.servers");
                } catch (Throwable th3) {
                    if (newInputStream != null) {
                        newInputStream.close();
                    }
                    throw th3;
                }
            }
        }
        return new Bootstrap(property2, property);
    }

    public boolean isEnabled() {
        return this.adminClient != null;
    }

    public Set<String> topicNames() {
        Throwable th = null;
        try {
            try {
                AdminClient adminClient = this.adminClient.get();
                try {
                    ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
                    listTopicsOptions.listInternal(false);
                    Map map = (Map) adminClient.listTopics(listTopicsOptions).namesToListings().get();
                    logger.info("topic names:{}", map.keySet());
                    Set<String> keySet = map.keySet();
                    if (adminClient != null) {
                        adminClient.close();
                    }
                    return keySet;
                } catch (Throwable th2) {
                    if (adminClient != null) {
                        adminClient.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (0 == 0) {
                    th = th3;
                } else if (null != th3) {
                    th.addSuppressed(th3);
                }
                throw th;
            }
        } catch (Exception e) {
            throw new ServerFault(e);
        }
    }

    public Set<String> topicNames(String str) {
        String replace = str.replace("bluemind-", "").replace("-", "");
        return (Set) topicNames().stream().filter(str2 -> {
            return str2.startsWith(replace);
        }).collect(Collectors.toSet());
    }

    public TopicSubscriber getSubscriber(String str, String... strArr) {
        return new KafkaTopicSubscriber(this.bootstrap.brokers(), Streams.concat(new Stream[]{Stream.of(str), Arrays.stream(strArr)}).toList(), this.reg, new IdFactory("kafka.consumer", this.reg, KafkaTopicSubscriber.class));
    }

    public TopicPublisher getPublisher(TopicDescriptor topicDescriptor) {
        return this.knownPublisher.computeIfAbsent(topicDescriptor, this::createImpl);
    }

    private KafkaTopicPublisher createImpl(TopicDescriptor topicDescriptor) {
        String physicalTopic = topicDescriptor.physicalTopic();
        logger.debug("{} bound to physical topic '{}'", topicDescriptor, physicalTopic);
        ensureKafkaTopic(physicalTopic);
        return new KafkaTopicPublisher(this.bootstrap.brokers(), physicalTopic);
    }

    /* JADX WARN: Finally extract failed */
    private void ensureKafkaTopic(String str) {
        if (KafkaTopicPublisher.perPhyTopicProd.containsKey(str)) {
            return;
        }
        Throwable th = null;
        try {
            try {
                AdminClient adminClient = this.adminClient.get();
                try {
                    ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
                    listTopicsOptions.listInternal(false);
                    if (!((Map) adminClient.listTopics(listTopicsOptions).namesToListings().get()).containsKey(str)) {
                        Config config = KafkaStoreConfig.get();
                        NewTopic newTopic = new NewTopic(str, PARTITION_COUNT, REPL_FACTOR);
                        newTopic.configs(Map.of("min.insync.replicas", Integer.toString(config.getInt("kafka.topic.minIsr")), "max.message.bytes", Long.toString((long) (config.getMemorySize("kafka.producer.maxRecordSize").toBytes() * 1.05d)), "compression.type", COMPRESSION_TYPE, "cleanup.policy", "compact", "max.compaction.lag.ms", Long.toString(config.getDuration("kafka.topic.maxCompactionLag", TimeUnit.MILLISECONDS)), "segment.ms", Long.toString(config.getDuration("kafka.topic.maxSegmentDuration", TimeUnit.MILLISECONDS))));
                        logger.info("Created topic {}: {}", str, (Uuid) adminClient.createTopics(Arrays.asList(newTopic), new CreateTopicsOptions()).topicId(str).get());
                    }
                    if (adminClient != null) {
                        adminClient.close();
                    }
                } catch (Throwable th2) {
                    if (adminClient != null) {
                        adminClient.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (0 == 0) {
                    th = th3;
                } else if (null != th3) {
                    th.addSuppressed(th3);
                }
                throw th;
            }
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw new ServerFault(e);
            }
        } catch (Exception e2) {
            throw new ServerFault(e2);
        }
    }

    public void delete(String str) {
        Throwable th = null;
        try {
            AdminClient adminClient = this.adminClient.get();
            try {
                Iterator<Map.Entry<TopicDescriptor, KafkaTopicPublisher>> it = this.knownPublisher.entrySet().iterator();
                while (it.hasNext()) {
                    if (it.next().getKey().physicalTopic().equals(str)) {
                        it.remove();
                    }
                }
                closePublisher(str);
                adminClient.deleteTopics(Collections.singleton(str), new DeleteTopicsOptions()).all().toCompletionStage().thenAccept(r5 -> {
                    logger.info("Topic {} deleted.", str);
                }).exceptionally(th2 -> {
                    logger.error("Deletion of {} failed ({})", new Object[]{str, th2.getMessage(), th2});
                    return null;
                }).toCompletableFuture().orTimeout(30L, TimeUnit.SECONDS).join();
                if (adminClient != null) {
                    adminClient.close();
                }
            } catch (Throwable th3) {
                if (adminClient != null) {
                    adminClient.close();
                }
                throw th3;
            }
        } catch (Throwable th4) {
            if (0 == 0) {
                th = th4;
            } else if (null != th4) {
                th.addSuppressed(th4);
            }
            throw th;
        }
    }

    public void flush(String str) {
        Iterator<Map.Entry<TopicDescriptor, KafkaTopicPublisher>> it = this.knownPublisher.entrySet().iterator();
        while (it.hasNext()) {
            if (it.next().getKey().physicalTopic().equals(str)) {
                it.remove();
            }
        }
        closePublisher(str);
    }

    private void closePublisher(String str) {
        Optional.ofNullable(KafkaTopicPublisher.perPhyTopicProd.remove(str)).ifPresent(kafkaProducer -> {
            logger.info("Closing {}", kafkaProducer);
            kafkaProducer.close(Duration.ofSeconds(20L));
        });
    }

    private void flushAll() {
        logger.info("Clear {} publisher(s)", Integer.valueOf(this.knownPublisher.size()));
        this.knownPublisher.clear();
    }

    public void reconfigure(String str, Map<String, String> map) {
        logger.info("reconfigure {} is not implemented.", str);
    }

    public TopicManager getManager() {
        return this;
    }
}
