package net.bluemind.central.reverse.proxy.stream;

import com.typesafe.config.Config;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.JsonObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.bluemind.central.reverse.proxy.common.ProxyEventBusAddress;
import net.bluemind.central.reverse.proxy.model.common.kafka.InstallationTopics;
import net.bluemind.central.reverse.proxy.model.common.kafka.KafkaAdminClient;
import net.bluemind.central.reverse.proxy.model.common.mapper.RecordKeyMapper;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/central/reverse/proxy/stream/DirEntriesStreamVerticle.class */
public class DirEntriesStreamVerticle extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger(DirEntriesStreamVerticle.class);
    private final Config config;
    private final String bootstrapServers;
    private final RecordKeyMapper<byte[]> keyMapper;
    private KafkaAdminClient adminClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/central/reverse/proxy/stream/DirEntriesStreamVerticle$PKSerDes.class */
    public static class PKSerDes implements Serde<RecordKeyMapper.PartitionnedKey<byte[]>> {
        private PKSerDes() {
        }

        public Serializer<RecordKeyMapper.PartitionnedKey<byte[]>> serializer() {
            return (str, partitionnedKey) -> {
                return (byte[]) partitionnedKey.key();
            };
        }

        public Deserializer<RecordKeyMapper.PartitionnedKey<byte[]>> deserializer() {
            return (str, bArr) -> {
                return new RecordKeyMapper.PartitionnedKey(-1, bArr);
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/central/reverse/proxy/stream/DirEntriesStreamVerticle$PartitionDecorator.class */
    public static class PartitionDecorator implements ProcessorSupplier<byte[], byte[], RecordKeyMapper.PartitionnedKey<byte[]>, byte[]> {
        private final CompletableFuture<Void> firstRecordProcessed = new CompletableFuture<>();

        private PartitionDecorator() {
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Processor<byte[], byte[], RecordKeyMapper.PartitionnedKey<byte[]>, byte[]> m1get() {
            return new Processor<byte[], byte[], RecordKeyMapper.PartitionnedKey<byte[]>, byte[]>() { // from class: net.bluemind.central.reverse.proxy.stream.DirEntriesStreamVerticle.PartitionDecorator.1
                private ProcessorContext<RecordKeyMapper.PartitionnedKey<byte[]>, byte[]> ctx;

                public void init(ProcessorContext<RecordKeyMapper.PartitionnedKey<byte[]>, byte[]> processorContext) {
                    this.ctx = processorContext;
                }

                public void process(Record<byte[], byte[]> record) {
                    if (PartitionDecorator.this.firstRecordProcessed.complete(null)) {
                        DirEntriesStreamVerticle.logger.info("[stream] First record processed.");
                    }
                    this.ctx.recordMetadata().ifPresentOrElse(recordMetadata -> {
                        this.ctx.forward(record.withKey(new RecordKeyMapper.PartitionnedKey(recordMetadata.partition(), (byte[]) record.key())));
                    }, () -> {
                        DirEntriesStreamVerticle.logger.warn("No metadata for {}", record);
                    });
                }
            };
        }

        public CompletableFuture<Void> firstRecordPromise() {
            return this.firstRecordProcessed;
        }
    }

    /* loaded from: input_file:net/bluemind/central/reverse/proxy/stream/DirEntriesStreamVerticle$StreamMonitor.class */
    private static class StreamMonitor implements KafkaStreams.StateListener {
        private final Vertx vx;
        private final KafkaStreams stream;
        private long periodic;
        private final MessageProducer<String> statePub;
        private final MessageProducer<Long> lagPub;

        public StreamMonitor(KafkaStreams kafkaStreams, Vertx vertx) {
            this.stream = kafkaStreams;
            this.vx = vertx;
            this.statePub = vertx.eventBus().publisher("dir.stream.state");
            this.lagPub = vertx.eventBus().publisher("dir.stream.lag");
        }

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            this.statePub.write(state.name());
            if (state == KafkaStreams.State.RUNNING) {
                this.periodic = this.vx.setPeriodic(5000L, l -> {
                    ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                    this.stream.metadataForLocalThreads().forEach(threadMetadata -> {
                        for (TaskMetadata taskMetadata : threadMetadata.activeTasks()) {
                            Map committedOffsets = taskMetadata.committedOffsets();
                            Map endOffsets = taskMetadata.endOffsets();
                            committedOffsets.forEach((topicPartition, l) -> {
                                long longValue = ((Long) endOffsets.getOrDefault(topicPartition, l)).longValue() - l.longValue();
                                concurrentHashMap.put(String.valueOf(taskMetadata.taskId()) + "-" + topicPartition.topic() + "#" + topicPartition.partition(), Long.valueOf(longValue < 0 ? 0L : longValue));
                            });
                        }
                    });
                    this.lagPub.write(Long.valueOf(concurrentHashMap.reduceValuesToLong(8L, (v0) -> {
                        return v0.longValue();
                    }, 0L, (j, j2) -> {
                        return j + j2;
                    })));
                });
            } else if (state2 == KafkaStreams.State.RUNNING) {
                this.vx.cancelTimer(this.periodic);
            }
        }
    }

    public DirEntriesStreamVerticle(Config config, RecordKeyMapper<byte[]> recordKeyMapper) {
        this.config = config;
        this.bootstrapServers = config.getString("bm.kafka.bootstrap.servers");
        this.keyMapper = recordKeyMapper;
    }

    public void start(Promise<Void> promise) {
        this.adminClient = KafkaAdminClient.create(this.bootstrapServers);
        logger.info("[stream] Starting");
        tryStart();
        promise.complete();
    }

    private void tryStart() {
        Throwable th = null;
        try {
            ForestInstancesLoader forestInstancesLoader = new ForestInstancesLoader(this.config);
            try {
                Set set = (Set) forestInstancesLoader.whiteListedInstances().stream().map(str -> {
                    return str.replace("-", "");
                }).collect(Collectors.toSet());
                this.adminClient.listTopics().map(set2 -> {
                    return onlyWhiteListed(set, set2, this.config);
                }).map(set3 -> {
                    return new InstallationTopics(set3, this.config.getString("bm.crp.topic.name-suffix"));
                }).compose(this::ensureStreamOutputTopicExists).map(this::streamDirEntries).onSuccess(installationTopics -> {
                    logger.info("[stream] Started");
                }).onFailure(th2 -> {
                    logger.error("[stream] Failed to setup dir entries stream, retrying in 5sec", th2);
                    this.vertx.setTimer(5000L, l -> {
                        tryStart();
                    });
                });
                if (forestInstancesLoader != null) {
                    forestInstancesLoader.close();
                }
            } catch (Throwable th3) {
                if (forestInstancesLoader != null) {
                    forestInstancesLoader.close();
                }
                throw th3;
            }
        } catch (Throwable th4) {
            if (0 == 0) {
                th = th4;
            } else if (null != th4) {
                th.addSuppressed(th4);
            }
            throw th;
        }
    }

    private Set<String> onlyWhiteListed(Set<String> set, Set<String> set2, Config config) {
        return "true".equalsIgnoreCase(System.getProperty("bm.crp.stream.enforce-forest", Boolean.toString(config.getBoolean("bm.crp.stream.enforce-forest")))) ? (Set) set2.stream().filter(str -> {
            Stream stream = set.stream();
            str.getClass();
            return stream.anyMatch(str::startsWith);
        }).collect(Collectors.toSet()) : set2;
    }

    private Future<InstallationTopics> ensureStreamOutputTopicExists(InstallationTopics installationTopics) {
        if (installationTopics.hasCrpTopic) {
            return Future.succeededFuture(installationTopics);
        }
        NewTopic newTopic = new NewTopic(installationTopics.crpTopicName, this.config.getInt("bm.crp.topic.partition-count"), this.config.getNumber("bm.crp.topic.replication-factor").shortValue());
        newTopic.configs(Map.of("compression.type", this.config.getString("bm.crp.topic.compression-type"), "cleanup.policy", this.config.getString("bm.crp.topic.cleanup-policy"), "max.compaction.lag.ms", this.config.getString("bm.crp.topic.max-compaction-lag-ms")));
        return this.adminClient.createTopic(newTopic, new CreateTopicsOptions()).map(uuid -> {
            return installationTopics;
        });
    }

    private InstallationTopics streamDirEntries(InstallationTopics installationTopics) {
        Properties properties = new Properties();
        properties.put("application.id", this.config.getString("bm.crp.stream.application-id"));
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("num.stream.threads", Integer.valueOf(Math.min(Runtime.getRuntime().availableProcessors(), this.config.getInt("bm.crp.stream.number-of-threads"))));
        properties.put("default.key.serde", Serdes.ByteArraySerde.class.getName());
        properties.put("default.value.serde", Serdes.ByteArraySerde.class.getName());
        Collection values = installationTopics.domainTopics.values();
        String str = installationTopics.crpTopicName;
        PartitionDecorator partitionDecorator = new PartitionDecorator();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(values).filter((bArr, bArr2) -> {
            return ((Boolean) this.keyMapper.map(bArr).map(recordKey -> {
                return Boolean.valueOf(bArr2 != null && (recordKey.type().equals("dir") || recordKey.type().equals("memberships")));
            }).orElse(false)).booleanValue();
        }).process(partitionDecorator, new String[0]).flatMap((partitionnedKey, bArr3) -> {
            ArrayList arrayList = new ArrayList(3);
            this.keyMapper.map(partitionnedKey).filter(recordKey -> {
                return recordKey.operation().equals("DELETE");
            }).ifPresent(recordKey2 -> {
                Optional map = this.keyMapper.map(recordKey2.withOperation("UPDATE")).map(bArr3 -> {
                    return new KeyValue(new RecordKeyMapper.PartitionnedKey(partitionnedKey.part(), bArr3), (Object) null);
                });
                arrayList.getClass();
                map.ifPresent((v1) -> {
                    r1.add(v1);
                });
                Optional map2 = this.keyMapper.map(recordKey2.withOperation("CREATE")).map(bArr4 -> {
                    return new KeyValue(new RecordKeyMapper.PartitionnedKey(partitionnedKey.part(), bArr4), (Object) null);
                });
                arrayList.getClass();
                map2.ifPresent((v1) -> {
                    r1.add(v1);
                });
            });
            arrayList.add(new KeyValue(partitionnedKey, bArr3));
            return arrayList;
        }).to(str, withProducer());
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.setStateListener(new StreamMonitor(kafkaStreams, this.vertx));
        kafkaStreams.setUncaughtExceptionHandler(th -> {
            logger.error("[stream] Exception occurred during stream processing", th);
            kafkaStreams.close();
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        kafkaStreams.start();
        return publishTopics(partitionDecorator, installationTopics);
    }

    private Produced<RecordKeyMapper.PartitionnedKey<byte[]>, byte[]> withProducer() {
        return Produced.with(new PKSerDes(), new Serdes.ByteArraySerde(), (str, partitionnedKey, bArr, i) -> {
            return Integer.valueOf(partitionnedKey.part() % i);
        });
    }

    private InstallationTopics publishTopics(PartitionDecorator partitionDecorator, InstallationTopics installationTopics) {
        Duration ofSeconds = Duration.ofSeconds(30L);
        logger.info("[stream] Waiting at most {} to notify model verticle", ofSeconds);
        partitionDecorator.firstRecordPromise().orTimeout(ofSeconds.toMillis(), TimeUnit.MILLISECONDS).whenComplete((r8, th) -> {
            if (th == null) {
                logger.info("[stream] Announcing dir entries stream ready: {}", JsonObject.mapFrom(installationTopics));
            } else {
                logger.warn("[stream] Took more than {} to process the first record, we will proceed anyway", ofSeconds);
            }
            this.vertx.eventBus().publish("proxy-address", JsonObject.mapFrom(installationTopics), ProxyEventBusAddress.STREAM_READY);
        });
        return installationTopics;
    }
}
