package net.bluemind.forest.instance.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import net.bluemind.core.api.fault.ServerFault;
import net.bluemind.core.container.api.IContainers;
import net.bluemind.core.container.model.ContainerChangeset;
import net.bluemind.core.container.model.ContainerDescriptor;
import net.bluemind.core.container.model.ItemValue;
import net.bluemind.core.rest.BmContext;
import net.bluemind.core.rest.ServerSideServiceProvider;
import net.bluemind.core.utils.JsonUtils;
import net.bluemind.directory.api.DirEntry;
import net.bluemind.directory.api.IDirectory;
import net.bluemind.forest.instance.api.ConsumerSetup;
import net.bluemind.forest.instance.api.IForestOrders;
import net.bluemind.forest.instance.api.ProducerSetup;
import net.bluemind.kafka.configuration.LocalConsumer;
import net.bluemind.kafka.configuration.LocalProducer;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/forest/instance/service/ForestOrdersService.class */
public class ForestOrdersService implements IForestOrders {
    private static final Logger logger = LoggerFactory.getLogger(ForestOrdersService.class);
    private final BmContext context;

    /* loaded from: input_file:net/bluemind/forest/instance/service/ForestOrdersService$FOrdersServiceFactory.class */
    public static class FOrdersServiceFactory implements ServerSideServiceProvider.IServerSideServiceFactory<IForestOrders> {
        public Class<IForestOrders> factoryClass() {
            return IForestOrders.class;
        }

        /* renamed from: instance, reason: merged with bridge method [inline-methods] */
        public IForestOrders m3instance(BmContext bmContext, String... strArr) throws ServerFault {
            return new ForestOrdersService(bmContext);
        }
    }

    public ForestOrdersService(BmContext bmContext) {
        this.context = bmContext;
        logger.debug("{}", this.context);
    }

    public void producer(ProducerSetup producerSetup) {
        logger.info("Producer... {}", producerSetup);
        KafkaProducer<String, String> create = LocalProducer.create(producerSetup.broker);
        long lastContainerVersionInTopic = lastContainerVersionInTopic(producerSetup);
        System.err.println("********* PRODUCE FOR " + producerSetup.kafkaTopic);
        try {
            ContainerDescriptor containerDescriptor = ((IContainers) this.context.provider().instance(IContainers.class, new String[0])).get(producerSetup.containerUid);
            logger.info("Start producing to {} at container version {}, type {}", new Object[]{producerSetup.kafkaTopic, Long.valueOf(lastContainerVersionInTopic), containerDescriptor.type});
            String str = containerDescriptor.type;
            switch (str.hashCode()) {
                case 99469:
                    if (str.equals("dir")) {
                        System.err.println("Produce for dir");
                        produceDelta(create, producerSetup, wrapDir((IDirectory) this.context.provider().instance(IDirectory.class, new String[]{containerDescriptor.domainUid})), lastContainerVersionInTopic);
                        break;
                    }
                default:
                    System.err.println("Don't know how to produce data to kafka for type " + containerDescriptor.type);
                    break;
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    private long lastContainerVersionInTopic(ProducerSetup producerSetup) {
        long j = 0;
        String uuid = UUID.randomUUID().toString();
        KafkaConsumer create = LocalConsumer.create(producerSetup.broker, "pre-produce-" + uuid, uuid);
        List asList = Arrays.asList(producerSetup.kafkaTopic);
        logger.info("Subscribing to {}", asList);
        create.subscribe(asList, new ConsumerRebalanceListener() { // from class: net.bluemind.forest.instance.service.ForestOrdersService.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                ForestOrdersService.logger.info("Got partitions {}", collection);
            }
        });
        create.poll(Duration.ZERO);
        Set assignment = create.assignment();
        logger.info("Current partitions {}", assignment);
        Map endOffsets = create.endOffsets(assignment);
        logger.info("end offsets {}", endOffsets);
        Map.Entry entry = null;
        for (Map.Entry entry2 : endOffsets.entrySet()) {
            logger.info("Checking {}", entry2);
            if (entry == null || ((Long) entry2.getValue()).longValue() > ((Long) entry.getValue()).longValue()) {
                entry = entry2;
            }
        }
        logger.info("Most recent: {}", entry);
        if (entry != null) {
            create.seek((TopicPartition) entry.getKey(), ((Long) entry.getValue()).longValue() - 1);
            ConsumerRecords poll = create.poll(Duration.ofSeconds(1L));
            logger.info("Got records: {}", Integer.valueOf(poll.count()));
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                String str = (String) ((ConsumerRecord) it.next()).value();
                if (str.charAt(0) == '{') {
                    try {
                        j = new ObjectMapper().readTree(str).get("version").asLong(0L);
                        logger.info("Container version from last item is {}", Long.valueOf(j));
                    } catch (IOException e) {
                        logger.error("value {}: {}", new Object[]{str, e.getMessage(), e});
                    }
                } else {
                    j = Long.parseLong(str);
                }
            }
        }
        return j;
    }

    private IChangesetAndLoad<DirEntry> wrapDir(final IDirectory iDirectory) {
        return new IChangesetAndLoad<DirEntry>() { // from class: net.bluemind.forest.instance.service.ForestOrdersService.2
            @Override // net.bluemind.forest.instance.service.IChangesetAndLoad
            public ContainerChangeset<String> changeset(Long l) throws ServerFault {
                return iDirectory.changeset(l);
            }

            @Override // net.bluemind.forest.instance.service.IChangesetAndLoad
            public List<ItemValue<DirEntry>> fetchMultiple(List<String> list) {
                return iDirectory.getMultiple(list);
            }
        };
    }

    private <T> void produceDelta(KafkaProducer<String, String> kafkaProducer, ProducerSetup producerSetup, IChangesetAndLoad<T> iChangesetAndLoad, long j) {
        ContainerChangeset<String> changeset = iChangesetAndLoad.changeset(Long.valueOf(j));
        String l = Long.toString(changeset.version);
        Iterator it = changeset.deleted.iterator();
        while (it.hasNext()) {
            kafkaProducer.send(new ProducerRecord(producerSetup.kafkaTopic, (String) it.next(), l));
        }
        Iterator it2 = Lists.partition(changeset.created, 100).iterator();
        while (it2.hasNext()) {
            for (ItemValue<T> itemValue : iChangesetAndLoad.fetchMultiple((List) it2.next())) {
                kafkaProducer.send(new ProducerRecord(producerSetup.kafkaTopic, itemValue.uid, JsonUtils.asString(itemValue)));
            }
        }
        Iterator it3 = Lists.partition(changeset.updated, 100).iterator();
        while (it3.hasNext()) {
            for (ItemValue<T> itemValue2 : iChangesetAndLoad.fetchMultiple((List) it3.next())) {
                kafkaProducer.send(new ProducerRecord(producerSetup.kafkaTopic, itemValue2.uid, JsonUtils.asString(itemValue2)));
            }
        }
    }

    public void consumer(ConsumerSetup consumerSetup) {
        logger.info("Consumer... {}", consumerSetup);
    }
}
