package net.bluemind.core.rest.vertx;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.streams.ReadStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/rest/vertx/VertxStreamProducerControlHandler.class */
public class VertxStreamProducerControlHandler {
    private Logger logger = LoggerFactory.getLogger((Class<?>) VertxStreamProducerControlHandler.class);
    private String controlAdr;
    private Vertx vertx;
    private ReadStream<Buffer> bodyStream;
    private VertxStreamProducer producer;
    private MessageConsumer<String> cons;

    public VertxStreamProducerControlHandler(Vertx vertx, String str, ReadStream<Buffer> readStream) {
        this.vertx = vertx;
        this.controlAdr = str;
        this.bodyStream = readStream;
    }

    public void stream() {
        this.bodyStream.pause2();
        this.cons = this.vertx.eventBus().consumer(this.controlAdr, message -> {
            this.vertx.runOnContext(r5 -> {
                handleControlMessage(message);
            });
        });
    }

    protected void handleControlMessage(Message<String> message) {
        String body = message.body();
        this.logger.info("receive something {}", body);
        if (body.startsWith("ready:")) {
            this.logger.info("recieve ready from {}", body.substring("ready:".length()));
            stream(this.vertx, body.substring("ready:".length()), false);
            return;
        }
        if (body.startsWith("ready-and-resume:")) {
            this.logger.debug("reciveve ready and stream from {}", body.substring("ready-and-resume:".length()));
            stream(this.vertx, body.substring("ready-and-resume:".length()), true);
            return;
        }
        if (body.equals("resume")) {
            this.producer.drain();
            return;
        }
        if (body.equals("pause")) {
            this.producer.markQueueFull();
        } else if (body.equals("close")) {
            this.logger.info("close before completion !");
            if (this.producer != null) {
                this.producer.closed();
            }
            close();
        }
    }

    private void close() {
        this.logger.debug("closestream {}", this.controlAdr);
        if (this.cons != null) {
            this.cons.unregister();
            this.cons = null;
        }
    }

    protected void stream(Vertx vertx, String str, boolean z) {
        this.producer = new VertxStreamProducer(vertx, str);
        this.bodyStream.pipe().endOnComplete(false).to(this.producer, asyncResult -> {
            this.producer.sendEnd();
            close();
        });
        if (z) {
            this.bodyStream.resume2();
        }
    }
}
