package net.bluemind.forest.cloud.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Verticle;
import io.vertx.core.json.JsonObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import net.bluemind.core.rest.http.ClientSideServiceProvider;
import net.bluemind.forest.cloud.api.Instance;
import net.bluemind.forest.instance.api.ForestEnpoints;
import net.bluemind.forest.instance.api.IForestEnrollment;
import net.bluemind.forest.instance.api.IForestOrders;
import net.bluemind.forest.instance.api.ProducerSetup;
import net.bluemind.kafka.configuration.Brokers;
import net.bluemind.kafka.configuration.IKafkaBroker;
import net.bluemind.kafka.configuration.LocalConsumer;
import net.bluemind.kafka.configuration.LocalProducer;
import net.bluemind.lib.vertx.IUniqueVerticleFactory;
import net.bluemind.lib.vertx.IVerticleFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/forest/cloud/service/JoiningInstancesProcessor.class */
public class JoiningInstancesProcessor extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger(JoiningInstancesProcessor.class);
    private KafkaConsumer<String, String> consumer;
    private boolean stopped;
    private KafkaProducer<String, String> producer;

    /* loaded from: input_file:net/bluemind/forest/cloud/service/JoiningInstancesProcessor$VertxFacto.class */
    public static class VertxFacto implements IVerticleFactory, IUniqueVerticleFactory {
        public boolean isWorker() {
            return true;
        }

        public Verticle newInstance() {
            return new JoiningInstancesProcessor();
        }
    }

    public void start() {
        logger.info("Starting {}", this);
        IKafkaBroker locate = Brokers.locate();
        String str = String.valueOf(locate.inspectAddress()) + ":" + locate.port();
        this.consumer = LocalConsumer.create(str, "forest-joining-group", "processor-" + ForestJoinService.JVM_ID);
        this.producer = LocalProducer.create(str);
        this.consumer.subscribe(Arrays.asList(KafkaTopics.JOINING), new ConsumerRebalanceListener() { // from class: net.bluemind.forest.cloud.service.JoiningInstancesProcessor.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                JoiningInstancesProcessor.logger.info("partitionsRevoked {}", collection);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                JoiningInstancesProcessor.logger.info("partitionsAssigned {}", collection);
                System.err.println("Seek " + collection + " !!!!");
                JoiningInstancesProcessor.this.consumer.seekToBeginning(collection);
            }
        });
        this.vertx.runOnContext(this::consume);
    }

    public void stop() {
        this.stopped = true;
    }

    private void consume(Void r9) {
        Iterator it = this.consumer.poll(Duration.ofMillis(500L)).iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            String str = (String) consumerRecord.value();
            logger.info("offset = {}, key = {}, value = {}", new Object[]{Long.valueOf(consumerRecord.offset()), consumerRecord.key(), str});
            if (str == null) {
                logger.info("{} does not need joining anymore", consumerRecord.key());
            } else {
                try {
                    Instance instance = (Instance) Mapper.get().readValue(str, Instance.class);
                    join(instance).whenComplete((r10, th) -> {
                        if (th != null) {
                            logger.warn("Instance processing of {} failed. Requeue ??", instance.installationId);
                            return;
                        }
                        System.err.println("storing join to hazelcast...");
                        Activator.getHazelcast().getMap(KafkaTopics.JOINED).set(instance.installationId, str);
                        this.producer.send(new ProducerRecord(KafkaTopics.JOINED, instance.installationId, str), (recordMetadata, exc) -> {
                            logger.info("Send {} to JOINED cb {}, {}", new Object[]{instance.installationId, recordMetadata, exc});
                        });
                        this.producer.send(new ProducerRecord(KafkaTopics.JOINING, instance.installationId, (Object) null), (recordMetadata2, exc2) -> {
                            logger.info("Delete {} from JOINING cb {}, {}", new Object[]{instance.installationId, recordMetadata2, exc2});
                        });
                    }).get(30L, TimeUnit.SECONDS);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
        if (this.stopped) {
            return;
        }
        this.vertx.runOnContext(this::consume);
    }

    private CompletableFuture<Void> join(Instance instance) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        System.err.println("Working on joining instance " + instance.installationId + "...");
        try {
            logger.info("instance {}", new JsonObject(Mapper.get().writeValueAsString(instance)).encodePrettily());
        } catch (JsonProcessingException e) {
            logger.error(e.getMessage(), e);
        }
        ClientSideServiceProvider origin = ClientSideServiceProvider.getProvider(instance.externalUrl, instance.coreToken).setOrigin(ForestJoinService.ORIGIN);
        ((IForestEnrollment) origin.instance(IForestEnrollment.class, new String[0])).checkpoint(new ForestEnpoints());
        IForestOrders iForestOrders = (IForestOrders) origin.instance(IForestOrders.class, new String[0]);
        ArrayList arrayList = new ArrayList(instance.aliases.size());
        logger.info("=================== Working on {} aliases...", Integer.valueOf(instance.aliases.size()));
        ForestJoinService.withAdminClient((adminClient, iKafkaBroker) -> {
            for (Instance.Partition partition : instance.aliases) {
                String str = String.valueOf(instance.installationId) + ".dir." + partition.domain;
                String str2 = String.valueOf(instance.installationId) + ".addressbook.addressbook_" + partition.domain;
                NewTopic newTopic = new NewTopic(str, 5, (short) iKafkaBroker.maxReplicas());
                NewTopic newTopic2 = new NewTopic(str2, 5, (short) iKafkaBroker.maxReplicas());
                arrayList.add(adminClient.createTopics(Arrays.asList(newTopic, newTopic2)).all().whenComplete((r14, th) -> {
                    if (th != null) {
                        logger.error("Error creating topics {} {}", new Object[]{newTopic, newTopic2, th});
                        return;
                    }
                    logger.info("Fresh topics created, setting up producers {} {}", str, str2);
                    ProducerSetup producerSetup = new ProducerSetup();
                    producerSetup.containerUid = partition.domain;
                    producerSetup.kafkaTopic = str;
                    producerSetup.broker = String.valueOf(iKafkaBroker.inspectAddress()) + ":" + iKafkaBroker.port();
                    iForestOrders.producer(producerSetup);
                    ProducerSetup producerSetup2 = new ProducerSetup();
                    producerSetup2.containerUid = "addressbook_" + partition.domain;
                    producerSetup2.kafkaTopic = str2;
                    producerSetup2.broker = String.valueOf(iKafkaBroker.inspectAddress()) + ":" + iKafkaBroker.port();
                    iForestOrders.producer(producerSetup2);
                }));
            }
        });
        KafkaFuture.allOf((KafkaFuture[]) arrayList.toArray(new KafkaFuture[0])).whenComplete((r6, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                logger.info("{} JOINED.", instance.installationId);
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }
}
