package net.bluemind.core.rest.vertx;

import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/rest/vertx/VertxStreamConsumerControlHandler.class */
public class VertxStreamConsumerControlHandler {
    private String controlAddress;
    private Vertx vertx;
    private VertxStreamConsumer stream;
    private MessageConsumer<VertxRestStreamObject> cons;
    private Logger logger = LoggerFactory.getLogger((Class<?>) VertxStreamConsumerControlHandler.class);
    private String recvAddress = null;

    public VertxStreamConsumerControlHandler(Vertx vertx, VertxStreamConsumer vertxStreamConsumer, String str) {
        this.vertx = vertx;
        this.stream = vertxStreamConsumer;
        this.controlAddress = str;
    }

    public void start(boolean z) {
        if (this.recvAddress != null) {
            if (z) {
                this.logger.debug("resume stream {}", this.controlAddress);
                this.vertx.eventBus().send(this.controlAddress, "resume");
                return;
            }
            return;
        }
        this.recvAddress = UUID.randomUUID().toString();
        handleData(this.recvAddress);
        this.logger.debug("ready stream {}", this.controlAddress);
        if (z) {
            this.logger.debug("send ready and resume to {}", this.controlAddress);
            this.vertx.eventBus().send(this.controlAddress, "ready-and-resume:" + this.recvAddress);
        } else {
            this.logger.debug("send ready to {}", this.controlAddress);
            this.vertx.eventBus().send(this.controlAddress, "ready:" + this.recvAddress);
        }
    }

    private void handleData(String str) {
        this.cons = this.vertx.eventBus().consumer(str, message -> {
            this.vertx.runOnContext(r5 -> {
                handleStreamObject(message);
            });
        });
    }

    protected void handleStreamObject(Message<VertxRestStreamObject> message) {
        VertxRestStreamObject body = message.body();
        this.logger.debug("receive data ({}) from stream {} end : {} ", body.data, this.controlAddress, Boolean.valueOf(body.end));
        if (!body.end) {
            if (body.end) {
                return;
            }
            this.stream.pushData(body.data);
        } else if (this.stream.endHandler == null) {
            this.logger.warn("no end handler!");
        } else {
            this.stream.pushEnd();
            close();
        }
    }

    public void pause() {
        this.logger.debug("pause stream {}", this.controlAddress);
        this.vertx.eventBus().send(this.controlAddress, "pause");
    }

    public void resume() {
        this.logger.debug("resume stream {}", this.controlAddress);
        this.vertx.eventBus().send(this.controlAddress, "resume");
    }

    public void sendClose() {
        this.logger.debug("close stream {}", this.controlAddress);
        this.vertx.eventBus().send(this.controlAddress, "close");
        close();
    }

    private void close() {
        if (this.cons != null) {
            this.cons.unregister();
            this.cons = null;
        }
    }
}
