package net.bluemind.hornetq.client.impl;

import com.google.common.collect.Iterables;
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisConnectionStateListener;
import io.lettuce.core.TrackingArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.vertx.core.json.JsonObject;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAdder;
import net.bluemind.keydb.common.ClientProvider;
import net.bluemind.keydb.common.JsonRedisCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/hornetq/client/impl/RedisConnection.class */
public class RedisConnection implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) RedisConnection.class);
    private static final DefaultThreadFactory tf = new DefaultThreadFactory("keydb-consume-loop");
    private static final Iterator<ExecutorService> cyclic = Iterables.cycle(Executors.newSingleThreadExecutor(tf), Executors.newSingleThreadExecutor(tf), Executors.newSingleThreadExecutor(tf), Executors.newSingleThreadExecutor(tf)).iterator();
    private final RedisClient redisClient;
    private final StatefulRedisPubSubConnection<String, JsonObject> pubSubConnection;
    private final StatefulRedisConnection<String, String> commands;
    private final Map<String, Runnable> invalidationListeners = new ConcurrentHashMap();
    private final Map<String, Runnable> reconnectListeners = new ConcurrentHashMap();
    private final LongAdder invalidations = new LongAdder();

    public RedisConnection(String str) {
        this.redisClient = ClientProvider.newClient(str);
        this.pubSubConnection = this.redisClient.connectPubSub(new JsonRedisCodec());
        this.commands = this.redisClient.connect();
        initInvalidationListener();
        initConnectionListener();
    }

    private void initInvalidationListener() {
        this.commands.addListener(pushMessage -> {
            if (pushMessage.getType().equals("invalidate")) {
                this.invalidations.increment();
                StringCodec stringCodec = StringCodec.ASCII;
                stringCodec.getClass();
                List list = (List) pushMessage.getContent(stringCodec::decodeKey).get(1);
                if (list != null) {
                    list.forEach(str -> {
                        Optional.ofNullable(this.invalidationListeners.get(str)).ifPresent(runnable -> {
                            pushExecutor().submit(runnable);
                        });
                    });
                }
            }
        });
        enableTracking();
    }

    private void initConnectionListener() {
        this.redisClient.addListener(new RedisConnectionStateListener() { // from class: net.bluemind.hornetq.client.impl.RedisConnection.1
            public void onRedisConnected(RedisChannelHandler<?, ?> redisChannelHandler, SocketAddress socketAddress) {
                if (redisChannelHandler == RedisConnection.this.commands) {
                    Thread.ofVirtual().start(() -> {
                        RedisConnection.this.enableTracking();
                        RedisConnection.this.reconnectListeners.forEach((str, runnable) -> {
                            RedisConnection.this.pushExecutor().submit(runnable);
                        });
                        RedisConnection.logger.debug("[keydb] Reconnected, tracking set");
                    });
                }
            }
        });
    }

    private void enableTracking() {
        this.commands.sync().clientTracking(TrackingArgs.Builder.enabled());
    }

    public ExecutorService pushExecutor() {
        return cyclic.next();
    }

    public long invalidationsReceived() {
        return this.invalidations.sum();
    }

    public StatefulRedisPubSubConnection<String, JsonObject> pubsub() {
        return this.pubSubConnection;
    }

    public void registerInvalidationListener(String str, Runnable runnable) {
        this.invalidationListeners.put(str, runnable);
    }

    public void registerReconnectListener(String str, Runnable runnable) {
        this.reconnectListeners.put(str, runnable);
    }

    public StatefulRedisConnection<String, String> keyvalue() {
        return this.commands;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.redisClient.close();
    }
}
