package net.bluemind.forest.cloud.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.map.listener.EntryAddedListener;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import net.bluemind.core.api.fault.ServerFault;
import net.bluemind.core.rest.BmContext;
import net.bluemind.core.rest.ServerSideServiceProvider;
import net.bluemind.forest.cloud.api.ForestTopology;
import net.bluemind.forest.cloud.api.IForestJoin;
import net.bluemind.forest.cloud.api.Instance;
import net.bluemind.kafka.configuration.Brokers;
import net.bluemind.kafka.configuration.IKafkaBroker;
import net.bluemind.kafka.configuration.LocalProducer;
import net.bluemind.kafka.configuration.StaticTopics;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/forest/cloud/service/ForestJoinService.class */
public class ForestJoinService implements IForestJoin {
    private static final Logger logger = LoggerFactory.getLogger(ForestJoinService.class);
    public static final IKafkaBroker storageLayer = Brokers.locate();
    public static final UUID JVM_ID = UUID.randomUUID();
    public static final String ORIGIN = "forest-cloud-" + JVM_ID;
    private final String sharedAlias;
    private final BmContext context;
    private final KafkaProducer<String, String> producer;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final HazelcastInstance hz;

    /* loaded from: input_file:net/bluemind/forest/cloud/service/ForestJoinService$FJoinServiceFactory.class */
    public static class FJoinServiceFactory implements ServerSideServiceProvider.IServerSideServiceFactory<IForestJoin> {
        private final KafkaProducer<String, String> prod = LocalProducer.create(String.valueOf(ForestJoinService.storageLayer.inspectAddress()) + ":9093");

        public Class<IForestJoin> factoryClass() {
            return IForestJoin.class;
        }

        /* renamed from: instance, reason: merged with bridge method [inline-methods] */
        public IForestJoin m1instance(BmContext bmContext, String... strArr) throws ServerFault {
            if (strArr == null || strArr.length < 1) {
                throw new ServerFault("wrong number of instance parameters");
            }
            return new ForestJoinService(bmContext, this.prod, Activator.getHazelcast(), strArr[0]);
        }
    }

    public static void init() {
        logger.info("Storage layer is {}", storageLayer);
        withAdminClient((adminClient, iKafkaBroker) -> {
            try {
                StaticTopics.reconfigure(adminClient, iKafkaBroker).get(1L, TimeUnit.MINUTES);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                logger.error(e.getMessage(), e);
            }
        });
    }

    public static void withAdminClient(BiConsumer<AdminClient, IKafkaBroker> biConsumer) {
        String inspectAddress = storageLayer.inspectAddress();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", String.valueOf(inspectAddress) + ":" + storageLayer.port());
        properties.put("client.id", "forest-rest-" + JVM_ID.toString());
        Throwable th = null;
        try {
            AdminClient create = AdminClient.create(properties);
            try {
                biConsumer.accept(create, storageLayer);
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th2) {
                if (create != null) {
                    create.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                th = th3;
            } else if (null != th3) {
                th.addSuppressed(th3);
            }
            throw th;
        }
    }

    public ForestJoinService(BmContext bmContext, KafkaProducer<String, String> kafkaProducer, HazelcastInstance hazelcastInstance, String str) {
        this.sharedAlias = str;
        this.context = bmContext;
        this.producer = kafkaProducer;
        this.hz = hazelcastInstance;
        logger.debug("{} {}", this.context, this.sharedAlias);
    }

    public ForestTopology handshake(final Instance instance) {
        logger.info("handshake {}", instance);
        ForestTopology forestTopology = new ForestTopology();
        forestTopology.broker = ForestTopology.KafkaListener.of(storageLayer.kafkaListener());
        try {
            String writeValueAsString = this.objectMapper.writeValueAsString(instance);
            IMap map = this.hz.getMap(KafkaTopics.JOINED);
            final CompletableFuture completableFuture = new CompletableFuture();
            String addEntryListener = map.addEntryListener(new EntryAddedListener<String, Instance>() { // from class: net.bluemind.forest.cloud.service.ForestJoinService.1
                public void entryAdded(EntryEvent<String, Instance> entryEvent) {
                    if (((String) entryEvent.getKey()).equals(instance.installationId)) {
                        completableFuture.complete(null);
                    }
                }
            }, false);
            this.producer.send(new ProducerRecord(KafkaTopics.JOINING, instance.installationId, writeValueAsString), (recordMetadata, exc) -> {
                logger.info("pushed {} to JOINING {}, {}", new Object[]{instance.installationId, recordMetadata, exc});
            });
            try {
                completableFuture.get(18L, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                logger.error("unable to join forest: {}", e.getMessage());
            } finally {
                map.removeEntryListener(addEntryListener);
            }
            return forestTopology;
        } catch (JsonProcessingException e2) {
            logger.error(e2.getMessage(), e2);
            throw new ServerFault(e2);
        }
    }
}
