package net.bluemind.core.backup.continuous.events.bodies;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import java.io.File;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import net.bluemind.config.InstallationId;
import net.bluemind.core.backup.continuous.DefaultBackupStore;
import net.bluemind.core.container.model.BaseContainerDescriptor;
import net.bluemind.core.context.SecurityContext;
import net.bluemind.core.rest.ServerSideServiceProvider;
import net.bluemind.domain.api.Domain;
import net.bluemind.domain.api.IDomains;
import net.bluemind.lib.vertx.IUniqueVerticleFactory;
import net.bluemind.lib.vertx.IVerticleFactory;
import net.bluemind.lib.vertx.VertxPlatform;
import net.bluemind.system.api.ISystemConfiguration;
import net.bluemind.system.api.SysConfKeys;
import net.bluemind.system.api.SystemState;
import net.bluemind.system.state.StateContext;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle.class */
public class BodiesMigrationVerticle extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger(BodiesMigrationVerticle.class);
    private LongAdder bodies = new LongAdder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$Bootstrap.class */
    public static final class Bootstrap extends Record {
        private final String zookeeper;
        private final String brokers;

        private Bootstrap(String str, String str2) {
            this.zookeeper = str;
            this.brokers = str2;
        }

        boolean valid() {
            return (this.zookeeper == null || this.brokers == null) ? false : true;
        }

        public String zookeeper() {
            return this.zookeeper;
        }

        public String brokers() {
            return this.brokers;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Bootstrap.class), Bootstrap.class, "zookeeper;brokers", "FIELD:Lnet/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$Bootstrap;->zookeeper:Ljava/lang/String;", "FIELD:Lnet/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$Bootstrap;->brokers:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Bootstrap.class), Bootstrap.class, "zookeeper;brokers", "FIELD:Lnet/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$Bootstrap;->zookeeper:Ljava/lang/String;", "FIELD:Lnet/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$Bootstrap;->brokers:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Bootstrap.class, Object.class), Bootstrap.class, "zookeeper;brokers", "FIELD:Lnet/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$Bootstrap;->zookeeper:Ljava/lang/String;", "FIELD:Lnet/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$Bootstrap;->brokers:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    /* loaded from: input_file:net/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$Factory.class */
    public static class Factory implements IVerticleFactory, IUniqueVerticleFactory {
        public boolean isWorker() {
            return true;
        }

        public Verticle newInstance() {
            return new BodiesMigrationVerticle();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/core/backup/continuous/events/bodies/BodiesMigrationVerticle$MigStateListener.class */
    public static class MigStateListener implements KafkaStreams.StateListener {
        private long periodic = -1;
        private Vertx vertx;
        private KafkaStreams stream;
        private LongAdder processed;

        public MigStateListener(KafkaStreams kafkaStreams, Vertx vertx, LongAdder longAdder) {
            this.stream = kafkaStreams;
            this.vertx = vertx;
            this.processed = longAdder;
        }

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            BodiesMigrationVerticle.logger.info("state {} -> {}", state2, state);
            if (state == KafkaStreams.State.RUNNING) {
                this.periodic = this.vertx.setPeriodic(60000L, l -> {
                    ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                    this.stream.metadataForLocalThreads().forEach(threadMetadata -> {
                        for (TaskMetadata taskMetadata : threadMetadata.activeTasks()) {
                            Map committedOffsets = taskMetadata.committedOffsets();
                            Map endOffsets = taskMetadata.endOffsets();
                            committedOffsets.forEach((topicPartition, l) -> {
                                long longValue = ((Long) endOffsets.getOrDefault(topicPartition, l)).longValue() - l.longValue();
                                concurrentHashMap.put(String.valueOf(taskMetadata.taskId()) + "-" + topicPartition.topic() + "#" + topicPartition.partition(), Long.valueOf(longValue < 0 ? 0L : longValue));
                            });
                        }
                    });
                    long reduceValuesToLong = concurrentHashMap.reduceValuesToLong(8L, (v0) -> {
                        return v0.longValue();
                    }, 0L, (j, j2) -> {
                        return j + j2;
                    });
                    BodiesMigrationVerticle.logger.info("lag is {}, processed {}", Long.valueOf(reduceValuesToLong), Long.valueOf(this.processed.sum()));
                    if (reduceValuesToLong == 0) {
                        this.vertx.executeBlocking(() -> {
                            ((ISystemConfiguration) ServerSideServiceProvider.getProvider(SecurityContext.SYSTEM).instance(ISystemConfiguration.class, new String[0])).updateMutableValues(Map.of(SysConfKeys.kafka_bodies_migrated.name(), "true"));
                            BodiesMigrationVerticle.logger.info("Closing kstream as we moved everything");
                            this.stream.close();
                            return null;
                        });
                    }
                });
            } else {
                this.vertx.cancelTimer(this.periodic);
            }
        }
    }

    public void start() throws Exception {
        VertxPlatform.executeBlockingTimer(this.vertx, 10000L, (v1) -> {
            startImpl(v1);
        });
    }

    private void startImpl(long j) {
        if (StateContext.getState() != SystemState.CORE_STATE_RUNNING) {
            VertxPlatform.executeBlockingTimer(this.vertx, 10000L, (v1) -> {
                startImpl(v1);
            });
            return;
        }
        if (Boolean.getBoolean(SysConfKeys.kafka_bodies_migrated.name())) {
            VertxPlatform.executeBlockingTimer(this.vertx, 10000L, (v1) -> {
                startImpl(v1);
            });
            return;
        }
        Bootstrap kafkaBootstrapServers = kafkaBootstrapServers();
        if (!kafkaBootstrapServers.valid()) {
            VertxPlatform.executeBlockingTimer(this.vertx, 30000L, (v1) -> {
                startImpl(v1);
            });
            return;
        }
        ServerSideServiceProvider provider = ServerSideServiceProvider.getProvider(SecurityContext.SYSTEM);
        Boolean booleanValue = ((ISystemConfiguration) provider.instance(ISystemConfiguration.class, new String[0])).getValues().booleanValue(SysConfKeys.kafka_bodies_migrated.name(), false);
        if (booleanValue != null && booleanValue.booleanValue()) {
            logger.info("Bodies migration is complete, no need to start over.");
            return;
        }
        String replace = InstallationId.getIdentifier().replace("bluemind-", "").replace("-", "");
        List list = ((IDomains) provider.instance(IDomains.class, new String[0])).all().stream().filter(itemValue -> {
            return !((Domain) itemValue.value).global;
        }).map(itemValue2 -> {
            return replace + "-" + itemValue2.uid;
        }).toList();
        String str = replace + "-bodies.store";
        logger.info("Steam to {}", str);
        withValidClassloader(() -> {
            initStream(kafkaBootstrapServers, list, str);
        });
    }

    private void initStream(Bootstrap bootstrap, List<String> list, String str) {
        JsonObject of = JsonObject.of();
        KeyValue keyValue = new KeyValue(of, of);
        logger.info("Created {} to ensure topic exists and is setup properly", DefaultBackupStore.store().forContainer(BaseContainerDescriptor.create("osef", "name", "aa_owner", "message_bodies", "bodies.store", true)));
        Properties properties = new Properties();
        properties.put("application.id", "core-body-migration-" + InstallationId.getIdentifier());
        properties.put("bootstrap.servers", bootstrap.brokers());
        properties.put("num.stream.threads", Integer.valueOf(Math.min(Runtime.getRuntime().availableProcessors(), 4)));
        properties.put("default.key.serde", Serdes.ByteArraySerde.class.getName());
        properties.put("default.value.serde", Serdes.ByteArraySerde.class.getName());
        properties.put(StreamsConfig.producerPrefix("max.request.size"), 12582912);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(list).map((bArr, bArr2) -> {
            JsonObject jsonObject = new JsonObject(Buffer.buffer(bArr));
            String string = jsonObject.getString("type");
            switch (string.hashCode()) {
                case 517169400:
                    if (string.equals("message_bodies")) {
                        this.bodies.increment();
                        return bodyKeyValue(jsonObject, bArr2);
                    }
                    break;
                case 806411845:
                    if (string.equals("message_bodies_es_source")) {
                        this.bodies.increment();
                        return esSourceKeyValue(jsonObject, bArr2);
                    }
                    break;
            }
            return keyValue;
        }).filter((jsonObject, jsonObject2) -> {
            return jsonObject != of;
        }).to(str, producer());
        createStream(streamsBuilder.build(), properties);
    }

    private void withValidClassloader(Runnable runnable) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(null);
        try {
            try {
                runnable.run();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            }
        } finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

    private void createStream(Topology topology, Properties properties) {
        KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
        kafkaStreams.setUncaughtExceptionHandler(th -> {
            logger.error(th.getMessage(), th);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        kafkaStreams.setStateListener(new MigStateListener(kafkaStreams, this.vertx, this.bodies));
        kafkaStreams.start();
    }

    private KeyValue<JsonObject, JsonObject> esSourceKeyValue(JsonObject jsonObject, byte[] bArr) {
        JsonObject jsonObject2 = new JsonObject(Buffer.buffer(bArr));
        String substring = jsonObject2.getString("uid").substring(0, 2);
        jsonObject.put("owner", substring + "_owner");
        jsonObject.put("uid", substring + "_owner_at_bodies.store_message_bodies");
        jsonObject.put("operation", "CREATE");
        return new KeyValue<>(jsonObject, jsonObject2);
    }

    private KeyValue<JsonObject, JsonObject> bodyKeyValue(JsonObject jsonObject, byte[] bArr) {
        JsonObject jsonObject2 = new JsonObject(Buffer.buffer(bArr));
        String substring = jsonObject2.getJsonObject("value").getString("guid").substring(0, 2);
        jsonObject.put("owner", substring + "_owner");
        jsonObject.put("uid", substring + "_owner_at_bodies.store_message_bodies_es_source");
        jsonObject.put("operation", "CREATE");
        return new KeyValue<>(jsonObject, jsonObject2);
    }

    Produced<JsonObject, JsonObject> producer() {
        Serde<JsonObject> serdes = serdes();
        return Produced.with(serdes, serdes, (str, jsonObject, jsonObject2, i) -> {
            return Integer.valueOf(Math.abs(jsonObject.getString("owner").hashCode() % i));
        });
    }

    private Serde<JsonObject> serdes() {
        return new Serde<JsonObject>() { // from class: net.bluemind.core.backup.continuous.events.bodies.BodiesMigrationVerticle.1
            public Serializer<JsonObject> serializer() {
                return (str, jsonObject) -> {
                    return jsonObject.encode().getBytes();
                };
            }

            public Deserializer<JsonObject> deserializer() {
                return new Deserializer<JsonObject>() { // from class: net.bluemind.core.backup.continuous.events.bodies.BodiesMigrationVerticle.1.1
                    /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                    public JsonObject m7deserialize(String str, byte[] bArr) {
                        return new JsonObject(Buffer.buffer(bArr));
                    }
                };
            }
        };
    }

    private Bootstrap kafkaBootstrapServers() {
        InputStream newInputStream;
        String property = System.getProperty("bm.kafka.bootstrap.servers");
        String property2 = System.getProperty("bm.zk.servers");
        if (property == null || property2 == null) {
            File file = new File("/etc/bm/kafka.properties");
            if (!file.exists()) {
                file = new File(System.getProperty("user.home") + "/kafka.properties");
            }
            if (file.exists()) {
                Properties properties = new Properties();
                Throwable th = null;
                try {
                    try {
                        newInputStream = Files.newInputStream(file.toPath(), new OpenOption[0]);
                    } catch (Throwable th2) {
                        if (0 == 0) {
                            th = th2;
                        } else if (null != th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    logger.warn(e.getMessage());
                }
                try {
                    properties.load(newInputStream);
                    if (newInputStream != null) {
                        newInputStream.close();
                    }
                    property = properties.getProperty("bootstrap.servers");
                    property2 = properties.getProperty("zookeeper.servers");
                } catch (Throwable th3) {
                    if (newInputStream != null) {
                        newInputStream.close();
                    }
                    throw th3;
                }
            }
        }
        return new Bootstrap(property2, property);
    }
}
