package net.bluemind.cql.sequences.zk;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.File;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.time.Duration;
import java.util.EnumSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import net.bluemind.config.InstallationId;
import net.bluemind.repository.sequences.ISequenceStore;
import net.bluemind.repository.sequences.SequencePersistenceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.DeleteBuilderMain;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/cql/sequences/zk/ZkSequenceStore.class */
public class ZkSequenceStore implements ISequenceStore {
    private static final Logger logger = LoggerFactory.getLogger(ZkSequenceStore.class);
    private static final Set<ISequenceStore.StoreCapability> CAPS = EnumSet.of(ISequenceStore.StoreCapability.DURABLE, ISequenceStore.StoreCapability.PARTITION_TOLERANT, ISequenceStore.StoreCapability.DISTRIBUTED);
    private static final Config conf = loadConfig();
    private static final String ROOT = "/bluemind/v5/" + InstallationId.getIdentifier().replace("bluemind-", "") + "/sequences/";
    private static final ConcurrentHashMap<String, CuratorFramework> con = new ConcurrentHashMap<>();
    private static final Cache<String, WithPrefetch> locks = Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).build();
    private static final AtomicLong MINIMUM_SEQUENCE = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch.class */
    public static final class WithPrefetch extends Record {
        private final ConcurrentLinkedQueue<Long> prefetch;
        private final DistributedAtomicLong seq;
        private final AtomicLong lastVal;
        private static final int PREFETCH_SIZE = ZkSequenceStore.conf.getInt("persistence.zk.prefetch");

        private WithPrefetch(ConcurrentLinkedQueue<Long> concurrentLinkedQueue, DistributedAtomicLong distributedAtomicLong, AtomicLong atomicLong) {
            this.prefetch = concurrentLinkedQueue;
            this.seq = distributedAtomicLong;
            this.lastVal = atomicLong;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v16, types: [long] */
        /* JADX WARN: Type inference failed for: r0v2, types: [java.util.concurrent.ConcurrentLinkedQueue<java.lang.Long>] */
        /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable] */
        public long next() {
            try {
                ?? r0 = this.prefetch;
                synchronized (r0) {
                    if (this.prefetch.isEmpty()) {
                        long j = ZkSequenceStore.MINIMUM_SEQUENCE.get();
                        long longValue = ((Long) this.seq.add(Long.valueOf(PREFETCH_SIZE)).postValue()).longValue();
                        if (longValue < j + PREFETCH_SIZE) {
                            longValue = j + PREFETCH_SIZE;
                            this.seq.forceSet(Long.valueOf(longValue));
                        }
                        LongStream rangeClosed = LongStream.rangeClosed((longValue - PREFETCH_SIZE) + 1, longValue);
                        ConcurrentLinkedQueue<Long> concurrentLinkedQueue = this.prefetch;
                        concurrentLinkedQueue.getClass();
                        rangeClosed.forEach((v1) -> {
                            r1.add(v1);
                        });
                    }
                    long longValue2 = this.prefetch.poll().longValue();
                    if (longValue2 > ZkSequenceStore.MINIMUM_SEQUENCE.get()) {
                        ZkSequenceStore.MINIMUM_SEQUENCE.set(longValue2);
                    }
                    this.lastVal.set(longValue2);
                    r0 = longValue2;
                }
                return r0;
            } catch (Exception e) {
                throw new SequencePersistenceException(e);
            }
        }

        public long cur() {
            return ZkSequenceStore.MINIMUM_SEQUENCE.get() > 0 ? ZkSequenceStore.MINIMUM_SEQUENCE.get() : this.lastVal.get();
        }

        public ConcurrentLinkedQueue<Long> prefetch() {
            return this.prefetch;
        }

        public DistributedAtomicLong seq() {
            return this.seq;
        }

        public AtomicLong lastVal() {
            return this.lastVal;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, WithPrefetch.class), WithPrefetch.class, "prefetch;seq;lastVal", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->prefetch:Ljava/util/concurrent/ConcurrentLinkedQueue;", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->seq:Lorg/apache/curator/framework/recipes/atomic/DistributedAtomicLong;", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->lastVal:Ljava/util/concurrent/atomic/AtomicLong;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, WithPrefetch.class), WithPrefetch.class, "prefetch;seq;lastVal", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->prefetch:Ljava/util/concurrent/ConcurrentLinkedQueue;", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->seq:Lorg/apache/curator/framework/recipes/atomic/DistributedAtomicLong;", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->lastVal:Ljava/util/concurrent/atomic/AtomicLong;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, WithPrefetch.class, Object.class), WithPrefetch.class, "prefetch;seq;lastVal", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->prefetch:Ljava/util/concurrent/ConcurrentLinkedQueue;", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->seq:Lorg/apache/curator/framework/recipes/atomic/DistributedAtomicLong;", "FIELD:Lnet/bluemind/cql/sequences/zk/ZkSequenceStore$WithPrefetch;->lastVal:Ljava/util/concurrent/atomic/AtomicLong;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }
    }

    private static Config loadConfig() {
        ConfigMergeable load = ConfigFactory.load(ZkSequenceStore.class.getClassLoader(), "resources/cql-persistence.conf");
        try {
            File file = new File("/etc/bm/cql-persistence.conf");
            if (file.exists()) {
                load = ConfigFactory.parseFile(file).withFallback(load);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return ConfigFactory.defaultApplication().withFallback(load);
    }

    private CuratorFramework curatorFramework(Config config) {
        return con.computeIfAbsent("curator", str -> {
            CuratorFramework newClient = CuratorFrameworkFactory.newClient(zkBootstrap(config), new BoundedExponentialBackoffRetry(100, 10000, 15));
            newClient.start();
            try {
                newClient.blockUntilConnected();
                return newClient;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SequencePersistenceException(e);
            }
        });
    }

    public long nextVal(String str) {
        return distributedLong(str).next();
    }

    public long nextVals(String str, int i) {
        WithPrefetch distributedLong = distributedLong(str);
        return IntStream.range(0, i).mapToLong(i2 -> {
            return distributedLong.next();
        }).reduce((j, j2) -> {
            return j2;
        }).orElseThrow();
    }

    public long curVal(String str) {
        return distributedLong(str).cur();
    }

    public void setMinimumValue(long j) {
        MINIMUM_SEQUENCE.set(j);
    }

    private static long safeCurVal(DistributedAtomicLong distributedAtomicLong) {
        try {
            return ((Long) distributedAtomicLong.get().postValue()).longValue();
        } catch (Exception e) {
            throw new SequencePersistenceException(e);
        }
    }

    private WithPrefetch distributedLong(String str) {
        return (WithPrefetch) locks.get(str, str2 -> {
            DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(curatorFramework(conf), ROOT + str2, new BoundedExponentialBackoffRetry(100, 10000, 15));
            return new WithPrefetch(new ConcurrentLinkedQueue(), distributedAtomicLong, new AtomicLong(safeCurVal(distributedAtomicLong)));
        });
    }

    private static String zkBootstrap(Config config) {
        InputStream newInputStream;
        String string = (config == null || !config.hasPath("persistence.zk.servers")) ? null : config.getString("persistence.zk.servers");
        if (string == null) {
            File file = new File("/etc/bm/kafka.properties");
            logger.warn("Loading from legacy {}", file.getAbsolutePath());
            if (!file.exists()) {
                file = new File(System.getProperty("user.home") + "/kafka.properties");
            }
            if (file.exists()) {
                Properties properties = new Properties();
                Throwable th = null;
                try {
                    try {
                        newInputStream = Files.newInputStream(file.toPath(), new OpenOption[0]);
                    } catch (Throwable th2) {
                        if (0 == 0) {
                            th = th2;
                        } else if (null != th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    logger.warn(e.getMessage());
                }
                try {
                    properties.load(newInputStream);
                    if (newInputStream != null) {
                        newInputStream.close();
                    }
                    string = properties.getProperty("zookeeper.servers");
                } catch (Throwable th3) {
                    if (newInputStream != null) {
                        newInputStream.close();
                    }
                    throw th3;
                }
            }
        }
        return string;
    }

    public void drop(String str) {
        try {
            ((DeleteBuilderMain) curatorFramework(conf).delete().quietly()).forPath(ROOT + str);
            logger.info("Sequence {} dropped.", str);
        } catch (Exception e) {
            throw new SequencePersistenceException(e);
        }
    }

    public Set<ISequenceStore.StoreCapability> capabilities() {
        return CAPS;
    }
}
