package net.bluemind.hornetq.client;

import io.lettuce.core.api.sync.RedisCommands;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.bluemind.hornetq.client.MQ;
import net.bluemind.hornetq.client.MQKeyDB;
import net.bluemind.hornetq.client.impl.RedisConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/hornetq/client/KeyDBSharedMap.class */
public class KeyDBSharedMap<K, V> implements MQ.SharedMap<K, V> {
    private static final String FLUSH_SHARED_MAPS = "flushSharedMaps";
    private final String mapName;
    private final RedisCommands<String, String> commands;
    private final MQKeyDB.Codec<K> keyCodec;
    private final MQKeyDB.Codec<V> valCodec;
    private final AtomicReference<Map<String, String>> cached;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KeyDBSharedMap.class);
    private static final ConcurrentHashMap<String, AtomicReference<Map<String, String>>> perKeyCache = new ConcurrentHashMap<>();

    public KeyDBSharedMap(String str, MQKeyDB.Codec<K> codec, MQKeyDB.Codec<V> codec2, RedisConnection redisConnection) {
        this.mapName = str;
        this.commands = redisConnection.keyvalue().sync();
        this.keyCodec = codec;
        this.valCodec = codec2;
        this.cached = perKeyCache.computeIfAbsent(this.mapName, str2 -> {
            AtomicReference atomicReference = new AtomicReference(this.commands.hgetall(str2));
            redisConnection.registerInvalidationListener(str2, () -> {
                logger.info("Refresh {} from server", str2);
                atomicReference.set(this.commands.hgetall(str2));
            });
            redisConnection.registerReconnectListener(str2, () -> {
                this.commands.get(FLUSH_SHARED_MAPS);
                this.commands.hgetall(str2);
            });
            return atomicReference;
        });
        this.commands.get(FLUSH_SHARED_MAPS);
        redisConnection.registerInvalidationListener(FLUSH_SHARED_MAPS, () -> {
            logger.info("Flush local shared map");
            perKeyCache.clear();
        });
    }

    @Override // net.bluemind.hornetq.client.MQ.SharedMap
    public void put(K k, V v) {
        String codec = this.keyCodec.toString(k);
        String codec2 = this.valCodec.toString(v);
        this.commands.hset(this.mapName, codec, codec2);
        this.cached.get().put(codec, codec2);
    }

    @Override // net.bluemind.hornetq.client.MQ.SharedMap
    public V get(K k) {
        String str = this.cached.get().get(this.keyCodec.toString(k));
        if (str == null) {
            return null;
        }
        return this.valCodec.fromString(str);
    }

    @Override // net.bluemind.hornetq.client.MQ.SharedMap
    public Set<K> keys() {
        Stream<String> stream = this.cached.get().keySet().stream();
        MQKeyDB.Codec<K> codec = this.keyCodec;
        codec.getClass();
        return (Set) stream.map(codec::fromString).collect(Collectors.toSet());
    }

    @Override // net.bluemind.hornetq.client.MQ.SharedMap
    public void remove(K k) {
        String codec = this.keyCodec.toString(k);
        this.commands.hdel(this.mapName, new String[]{codec});
        this.cached.get().remove(codec);
    }
}
