package net.bluemind.core.rest.base;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import net.bluemind.core.api.Stream;
import net.bluemind.core.api.fault.ServerFault;
import net.bluemind.core.rest.vertx.VertxStream;
import net.bluemind.lib.vertx.Result;
import net.bluemind.lib.vertx.VertxContext;
import net.bluemind.lib.vertx.VertxPlatform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/core/rest/base/GenericStream.class */
public abstract class GenericStream<T> implements ReadStream<Buffer> {
    private static Logger logger = LoggerFactory.getLogger(GenericStream.class);
    private Handler<Buffer> dataHandler;
    private boolean paused;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private boolean ended;

    /* loaded from: input_file:net/bluemind/core/rest/base/GenericStream$AccumulatorStream.class */
    public static class AccumulatorStream extends BaseStream<Buffer> {
        private final Buffer buffer = Buffer.buffer();

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v2 */
        /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v7 */
        public Future<Void> write(Buffer buffer) {
            if (buffer != null) {
                ?? r0 = this;
                synchronized (r0) {
                    this.buffer.appendBuffer(buffer);
                    r0 = r0;
                }
            }
            return Future.succeededFuture();
        }

        public Buffer buffer() {
            return this.buffer;
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        public /* bridge */ /* synthetic */ boolean writeQueueFull() {
            return super.writeQueueFull();
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        public /* bridge */ /* synthetic */ void end(Handler handler) {
            super.end(handler);
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        public /* bridge */ /* synthetic */ Future end() {
            return super.end();
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        public /* bridge */ /* synthetic */ void write(Buffer buffer, Handler handler) {
            super.write(buffer, handler);
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        /* renamed from: setWriteQueueMaxSize */
        public /* bridge */ /* synthetic */ BaseStream<Buffer> m83setWriteQueueMaxSize(int i) {
            return super.m83setWriteQueueMaxSize(i);
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        public /* bridge */ /* synthetic */ BaseStream<Buffer> exceptionHandler(Handler handler) {
            return super.exceptionHandler((Handler<Throwable>) handler);
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        public /* bridge */ /* synthetic */ BaseStream<Buffer> drainHandler(Handler handler) {
            return super.drainHandler((Handler<Void>) handler);
        }
    }

    /* loaded from: input_file:net/bluemind/core/rest/base/GenericStream$BaseStream.class */
    private static abstract class BaseStream<T> implements WriteStream<T> {
        private BaseStream() {
        }

        public BaseStream<T> exceptionHandler(Handler<Throwable> handler) {
            return this;
        }

        /* renamed from: setWriteQueueMaxSize, reason: merged with bridge method [inline-methods] */
        public BaseStream<T> m83setWriteQueueMaxSize(int i) {
            return this;
        }

        public boolean writeQueueFull() {
            return false;
        }

        public BaseStream<T> drainHandler(Handler<Void> handler) {
            return this;
        }

        public void write(T t, Handler<AsyncResult<Void>> handler) {
            write(t);
            handler.handle(Result.success());
        }

        public Future<Void> end() {
            return Future.succeededFuture();
        }

        public void end(Handler<AsyncResult<Void>> handler) {
            handler.handle(Result.success());
        }

        /* renamed from: exceptionHandler, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ StreamBase m84exceptionHandler(Handler handler) {
            return exceptionHandler((Handler<Throwable>) handler);
        }

        /* renamed from: exceptionHandler, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ WriteStream m85exceptionHandler(Handler handler) {
            return exceptionHandler((Handler<Throwable>) handler);
        }

        /* renamed from: drainHandler, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ WriteStream m86drainHandler(Handler handler) {
            return drainHandler((Handler<Void>) handler);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/core/rest/base/GenericStream$FileWriterStream.class */
    public static class FileWriterStream extends BaseStream<Buffer> implements AutoCloseable {
        private OutputStream out;
        private Logger logger = LoggerFactory.getLogger(FileWriterStream.class);

        public FileWriterStream(File file, StandardOpenOption... standardOpenOptionArr) {
            try {
                this.out = Files.newOutputStream(file.toPath(), standardOpenOptionArr);
            } catch (IOException e) {
                this.logger.warn("Cannot open new file {} for writing", file.getAbsolutePath(), e);
            }
        }

        public Future<Void> write(Buffer buffer) {
            try {
                this.out.write(buffer.getBytes());
            } catch (IOException e) {
                this.logger.warn("Cannot stream to file", e);
            }
            return Future.succeededFuture();
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            try {
                this.out.close();
            } catch (IOException unused) {
            }
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        public Future<Void> end() {
            close();
            return super.end();
        }

        @Override // net.bluemind.core.rest.base.GenericStream.BaseStream
        public void end(Handler<AsyncResult<Void>> handler) {
            close();
            super.end(handler);
        }
    }

    /* loaded from: input_file:net/bluemind/core/rest/base/GenericStream$State.class */
    public enum State {
        MORE,
        ENDED;

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static State[] valuesCustom() {
            State[] valuesCustom = values();
            int length = valuesCustom.length;
            State[] stateArr = new State[length];
            System.arraycopy(valuesCustom, 0, stateArr, 0, length);
            return stateArr;
        }
    }

    /* loaded from: input_file:net/bluemind/core/rest/base/GenericStream$StreamState.class */
    public static class StreamState<T> {
        public final State state;
        public final T value;

        public StreamState(State state, T t) {
            this.state = state;
            this.value = t;
        }

        public static <T> StreamState<T> create(State state, T t) {
            return new StreamState<>(state, t);
        }

        public static <T> StreamState<T> data(T t) {
            return create(State.MORE, t);
        }

        public static <T> StreamState<T> end() {
            return create(State.ENDED, null);
        }
    }

    public GenericStream<T> handler(Handler<Buffer> handler) {
        this.dataHandler = handler;
        read();
        return this;
    }

    /* renamed from: fetch, reason: merged with bridge method [inline-methods] */
    public GenericStream<T> m80fetch(long j) {
        return this;
    }

    /* renamed from: pause, reason: merged with bridge method [inline-methods] */
    public GenericStream<T> m76pause() {
        this.paused = true;
        return this;
    }

    /* renamed from: resume, reason: merged with bridge method [inline-methods] */
    public GenericStream<T> m79resume() {
        if (this.paused) {
            this.paused = false;
            read();
        }
        return this;
    }

    public GenericStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public GenericStream<T> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    private void read() {
        if (this.paused || this.ended) {
            return;
        }
        readSome(VertxContext.getOrCreateDuplicatedContext(VertxPlatform.getVertx()));
    }

    private CompletableFuture<Void> readSome(Context context) {
        return (this.ended || this.paused) ? CompletableFuture.completedFuture(null) : nextAsync(context).thenCompose(streamState -> {
            if (streamState.state == State.ENDED) {
                this.ended = true;
                if (this.endHandler != null) {
                    this.endHandler.handle((Object) null);
                }
                return CompletableFuture.completedFuture(null);
            }
            try {
                return VertxPlatform.getVertx().executeBlocking(() -> {
                    this.dataHandler.handle(serialize(streamState.value));
                    return null;
                }, true).toCompletionStage().toCompletableFuture().thenCompose((Function) obj -> {
                    return readSome(context);
                });
            } catch (ServerFault e) {
                throw e;
            } catch (Exception e2) {
                throw new ServerFault(e2);
            }
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            error(th);
            return null;
        });
    }

    private CompletableFuture<StreamState<T>> nextAsync(Context context) {
        CompletableFuture<StreamState<T>> completableFuture = new CompletableFuture<>();
        CompletableFuture.supplyAsync(this::safeNext, ExecutorHolder.getAsService()).whenComplete((BiConsumer) (streamState, th) -> {
            context.runOnContext(r6 -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    completableFuture.complete(streamState);
                }
            });
        });
        return completableFuture;
    }

    private final StreamState<T> safeNext() {
        try {
            return next();
        } catch (ServerFault e) {
            throw e;
        } catch (Exception e2) {
            throw new ServerFault(e2);
        }
    }

    protected abstract Buffer serialize(T t) throws Exception;

    protected abstract StreamState<T> next() throws Exception;

    private void error(Throwable th) {
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle(th);
        }
        logger.error("error reading backup stream", th);
        this.ended = true;
        if (this.endHandler != null) {
            this.endHandler.handle((Object) null);
        }
    }

    public static String streamToString(Stream stream) {
        return streamToBuffer(stream).buffer().toString();
    }

    public static byte[] streamToBytes(Stream stream) {
        return streamToBuffer(stream).buffer().getBytes();
    }

    private static AccumulatorStream streamToBuffer(Stream stream) {
        ReadStream read = VertxStream.read(stream);
        AccumulatorStream accumulatorStream = new AccumulatorStream();
        stream(read, accumulatorStream);
        return accumulatorStream;
    }

    public static CompletableFuture<Buffer> asyncStreamToBuffer(Stream stream) {
        ReadStream read = VertxStream.read(stream);
        AccumulatorStream accumulatorStream = new AccumulatorStream();
        return asyncStream(read, accumulatorStream).thenApply(r3 -> {
            return accumulatorStream.buffer();
        });
    }

    public static CompletableFuture<Void> slowRead(Stream stream) {
        return asyncStream(VertxStream.read(stream), new SlowWriteStream());
    }

    public static void streamToFile(Stream stream, File file) {
        streamToFile(stream, file, StandardOpenOption.CREATE_NEW);
    }

    public static void streamToFile(Stream stream, File file, StandardOpenOption... standardOpenOptionArr) {
        ReadStream read = VertxStream.read(stream);
        Throwable th = null;
        try {
            FileWriterStream fileWriterStream = new FileWriterStream(file, standardOpenOptionArr);
            try {
                stream(read, fileWriterStream);
                if (fileWriterStream != null) {
                    fileWriterStream.close();
                }
            } catch (Throwable th2) {
                if (fileWriterStream != null) {
                    fileWriterStream.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                th = th3;
            } else if (null != th3) {
                th.addSuppressed(th3);
            }
            throw th;
        }
    }

    public static CompletableFuture<Void> asyncStreamToFile(Stream stream, File file, StandardOpenOption... standardOpenOptionArr) {
        return asyncStreamToFile((ReadStream<Buffer>) VertxStream.read(stream), file, standardOpenOptionArr);
    }

    public static CompletableFuture<Void> asyncStreamToFile(ReadStream<Buffer> readStream, File file, StandardOpenOption... standardOpenOptionArr) {
        return asyncStream(readStream, new FileWriterStream(file, standardOpenOptionArr));
    }

    private static <T> void stream(ReadStream<T> readStream, WriteStream<T> writeStream) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        readStream.pipeTo(writeStream, asyncResult -> {
            countDownLatch.countDown();
        });
        readStream.resume();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
            Thread.currentThread().interrupt();
        }
    }

    private static <T> CompletableFuture<Void> asyncStream(ReadStream<T> readStream, WriteStream<T> writeStream) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        readStream.pipeTo(writeStream, asyncResult -> {
            completableFuture.complete(null);
        });
        readStream.resume();
        return completableFuture;
    }

    public static <T> Stream simpleValue(final T t, final Function<T, byte[]> function) {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return VertxStream.stream(new GenericStream<T>() { // from class: net.bluemind.core.rest.base.GenericStream.1
            @Override // net.bluemind.core.rest.base.GenericStream
            protected StreamState<T> next() throws Exception {
                if (atomicBoolean.get()) {
                    return StreamState.end();
                }
                atomicBoolean.set(true);
                return StreamState.data(t);
            }

            @Override // net.bluemind.core.rest.base.GenericStream
            protected Buffer serialize(T t2) throws Exception {
                return Buffer.buffer((byte[]) function.apply(t2));
            }
        });
    }

    /* renamed from: handler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m77handler(Handler handler) {
        return handler((Handler<Buffer>) handler);
    }

    /* renamed from: endHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m78endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m81exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m82exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
