package net.bluemind.tx.outbox.service;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Verticle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
import net.bluemind.core.backup.continuous.api.IBackupStore;
import net.bluemind.core.backup.continuous.api.IBackupStoreFactory;
import net.bluemind.core.backup.continuous.api.Providers;
import net.bluemind.core.container.model.BaseContainerDescriptor;
import net.bluemind.core.container.model.DataLocation;
import net.bluemind.core.context.SecurityContext;
import net.bluemind.core.rest.ServerSideServiceProvider;
import net.bluemind.lib.vertx.IUniqueVerticleFactory;
import net.bluemind.lib.vertx.IVerticleFactory;
import net.bluemind.lib.vertx.VertxPlatform;
import net.bluemind.network.topology.Topology;
import net.bluemind.repository.provider.RepositoryProvider;
import net.bluemind.server.api.TagDescriptor;
import net.bluemind.system.api.SystemState;
import net.bluemind.system.state.StateContext;
import net.bluemind.tx.outbox.repository.ITxOutboxRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/tx/outbox/service/TxOutboxBackupStoreFlusher.class */
public class TxOutboxBackupStoreFlusher extends AbstractVerticle {
    public static final int SLICE = 500;
    private final Supplier<ITxOutboxRepository> repository;
    private final String logName;
    private static final Logger logger = LoggerFactory.getLogger(TxOutboxBackupStoreFlusher.class);
    private static final Set<SystemState> FLUSH_SAFE = EnumSet.of(SystemState.CORE_STATE_STARTING, SystemState.CORE_STATE_RUNNING, SystemState.CORE_STATE_DEMOTED);

    /* loaded from: input_file:net/bluemind/tx/outbox/service/TxOutboxBackupStoreFlusher$RegBjData.class */
    public static class RegBjData implements IVerticleFactory, IUniqueVerticleFactory {
        public boolean isWorker() {
            return true;
        }

        public Verticle newInstance() {
            return new TxOutboxBackupStoreFlusher("BJ-DATA", () -> {
                return (ITxOutboxRepository) RepositoryProvider.instance(ITxOutboxRepository.class, ServerSideServiceProvider.getProvider(SecurityContext.SYSTEM).getContext(), (DataLocation) mboxLocations().getFirst());
            });
        }

        private List<DataLocation> mboxLocations() {
            return (List) Topology.getIfAvailable().map(iServiceTopology -> {
                return iServiceTopology.all(new String[]{TagDescriptor.bm_pgsql_data.getTag(), TagDescriptor.mail_imap.getTag()}).stream().distinct().map(itemValue -> {
                    return DataLocation.of(itemValue.uid);
                }).toList();
            }).orElseGet(Collections::emptyList);
        }
    }

    /* loaded from: input_file:net/bluemind/tx/outbox/service/TxOutboxBackupStoreFlusher$RegDirectory.class */
    public static class RegDirectory implements IVerticleFactory, IUniqueVerticleFactory {
        public boolean isWorker() {
            return true;
        }

        public Verticle newInstance() {
            return new TxOutboxBackupStoreFlusher("BJ", () -> {
                return (ITxOutboxRepository) RepositoryProvider.instance(ITxOutboxRepository.class, ServerSideServiceProvider.getProvider(SecurityContext.SYSTEM).getContext(), DataLocation.directory());
            });
        }
    }

    public TxOutboxBackupStoreFlusher(String str, Supplier<ITxOutboxRepository> supplier) {
        this.repository = supplier;
        this.logName = str;
    }

    public void start() throws Exception {
        scheduleFlush(500L);
    }

    private long scheduleFlush(long j) {
        return VertxPlatform.executeBlockingTimer(this.vertx, j, l -> {
            flushSlice();
        });
    }

    public long flushSlice() {
        IBackupStoreFactory iBackupStoreFactory = Providers.get();
        if (!iBackupStoreFactory.isPaused() && FLUSH_SAFE.contains(StateContext.getState()) && iBackupStoreFactory.leadership().isLeader()) {
            ArrayList arrayList = new ArrayList(SLICE);
            LongAdder longAdder = new LongAdder();
            try {
                ITxOutboxRepository iTxOutboxRepository = this.repository.get();
                List kafkaPending = iTxOutboxRepository.kafkaPending(SLICE);
                Iterator it = kafkaPending.iterator();
                while (it.hasNext()) {
                    ITxOutboxRepository.KafkaPayload kafkaPayload = iTxOutboxRepository.get(((Long) it.next()).longValue());
                    IBackupStore forContainer = iBackupStoreFactory.forContainer(BaseContainerDescriptor.create("dummy_uid", "dummy_name", kafkaPayload.partKey(), "type", kafkaPayload.domainUid(), false));
                    if (logger.isTraceEnabled()) {
                        logger.trace("Flush d:{},o:{} to {}", new Object[]{kafkaPayload.domainUid(), kafkaPayload.partKey(), forContainer});
                    }
                    byte[] value = kafkaPayload.value();
                    if (value.length == 0) {
                        value = null;
                    }
                    arrayList.add(forContainer.storeRaw(kafkaPayload.partKey(), kafkaPayload.key(), value).thenAccept(r3 -> {
                        longAdder.increment();
                    }));
                }
                CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[arrayList.size()])).join();
                long sum = longAdder.sum();
                if (sum > 0) {
                    long longValue = ((Long) kafkaPending.getLast()).longValue();
                    iTxOutboxRepository.deleteOffsets(kafkaPending);
                    iTxOutboxRepository.seqHolder().flushed.set(longValue);
                    logger.debug("[{}] Flushed {} item(s) up to id {} to store; seq is at {}", new Object[]{this.logName, Long.valueOf(sum), Long.valueOf(longValue), iTxOutboxRepository.seqHolder().flushed});
                }
            } catch (Exception e) {
                logger.error("Kafka outbox transient failure", e);
            }
            return scheduleFlush(longAdder.sum() == 500 ? 5 : SLICE);
        }
        return scheduleFlush(500L);
    }
}
