package net.bluemind.eas.wbxml.builder.vertx;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import net.bluemind.eas.dto.base.DisposableByteSource;
import net.bluemind.vertx.common.LocalJsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/eas/wbxml/builder/vertx/ByteSourceEventProducer.class */
public class ByteSourceEventProducer extends AbstractVerticle {
    public static final String REGISTER = "wbxml.bytesource.register";
    public static final String NEXT_CHUNK = "wbxml.stream.next.chunk";
    private final NextStreamChunkRequestHandler nextHandler = new NextStreamChunkRequestHandler(this, null);
    private static final Map<String, InputStream> liveStreams = new ConcurrentHashMap();
    private static final Map<String, DisposableByteSource> disposables = new ConcurrentHashMap();
    private static final AtomicLong streamId = new AtomicLong(0);
    private static final Logger logger = LoggerFactory.getLogger(ByteSourceEventProducer.class);

    /* loaded from: input_file:net/bluemind/eas/wbxml/builder/vertx/ByteSourceEventProducer$NextStreamChunkRequestHandler.class */
    private final class NextStreamChunkRequestHandler implements Handler<Message<String>> {
        private final LocalJsonObject<Chunk> LAST;
        private final LocalJsonObject<Chunk> UNKNOWN;

        private NextStreamChunkRequestHandler() {
            this.LAST = new LocalJsonObject<>(Chunk.LAST);
            this.UNKNOWN = new LocalJsonObject<>(Chunk.UNKNOWN);
        }

        public void handle(Message<String> message) {
            String str = (String) message.body();
            ByteSourceEventProducer.logger.debug("Chunk request for stream {}", str);
            InputStream inputStream = (InputStream) ByteSourceEventProducer.liveStreams.get(str);
            if (inputStream == null) {
                ByteSourceEventProducer.logger.error("{} ************ Stream {} is unknown", this, str);
                message.reply(this.UNKNOWN);
                return;
            }
            byte[] bArr = new byte[65536];
            try {
                int read = inputStream.read(bArr);
                if (read == -1) {
                    ((InputStream) ByteSourceEventProducer.liveStreams.remove(str)).close();
                    ((DisposableByteSource) ByteSourceEventProducer.disposables.remove(str)).dispose();
                    message.reply(this.LAST);
                    if (ByteSourceEventProducer.logger.isDebugEnabled()) {
                        ByteSourceEventProducer.logger.debug("Sending last chunk for stream {}.", str);
                    }
                } else {
                    Chunk chunk = new Chunk();
                    chunk.buf = Arrays.copyOf(bArr, read);
                    message.reply(new LocalJsonObject(chunk));
                    ByteSourceEventProducer.logger.debug("Sent {} byte(s) chunk.", Integer.valueOf(read));
                }
            } catch (IOException e) {
                ByteSourceEventProducer.logger.error(e.getMessage(), e);
            }
        }

        /* synthetic */ NextStreamChunkRequestHandler(ByteSourceEventProducer byteSourceEventProducer, NextStreamChunkRequestHandler nextStreamChunkRequestHandler) {
            this();
        }
    }

    public void start() {
        this.vertx.eventBus().consumer(REGISTER, message -> {
            DisposableByteSource disposableByteSource = (DisposableByteSource) ((LocalJsonObject) message.body()).getValue();
            String str = "stream" + streamId.incrementAndGet();
            try {
                InputStream openStream = disposableByteSource.source().openStream();
                liveStreams.put(str, openStream);
                disposables.put(str, disposableByteSource);
                if (logger.isDebugEnabled()) {
                    logger.debug("{} stream {} registered {}", new Object[]{this, str, openStream});
                }
                message.reply(str);
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            }
        });
        this.vertx.eventBus().consumer(NEXT_CHUNK, this.nextHandler);
    }
}
