package net.bluemind.backend.mail.parsing;

import com.google.common.io.CountingInputStream;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import net.bluemind.core.api.Stream;
import net.bluemind.core.rest.vertx.VertxStream;
import net.bluemind.jna.utils.MemfdSupport;
import net.bluemind.lib.vertx.utils.MemfdWriteStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/bluemind/backend/mail/parsing/EZInputStreamAdapter.class */
public class EZInputStreamAdapter {
    private static final Logger logger = LoggerFactory.getLogger(EZInputStreamAdapter.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/backend/mail/parsing/EZInputStreamAdapter$AdaptException.class */
    public static class AdaptException extends RuntimeException {
        public AdaptException(Throwable th) {
            super(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/bluemind/backend/mail/parsing/EZInputStreamAdapter$ResetableOutput.class */
    public static class ResetableOutput {
        private final Path file;
        private final SeekableByteChannel fileOut;
        private boolean closed;
        private boolean reset;

        public ResetableOutput(Path path) {
            this.file = path;
            try {
                this.fileOut = Files.newByteChannel(this.file, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
            } catch (IOException e) {
                throw new AdaptException(e);
            }
        }

        public void write(ByteBuffer byteBuffer) throws IOException {
            this.fileOut.write(byteBuffer);
        }

        public void close() {
            try {
                this.fileOut.close();
            } catch (Exception e) {
                EZInputStreamAdapter.logger.error(e.getMessage(), e);
            }
            this.closed = true;
        }

        public InputStream input() throws IOException {
            return Files.newInputStream(this.file, new OpenOption[0]);
        }

        public void reset() {
            try {
                Files.delete(this.file);
            } catch (IOException unused) {
            }
            this.reset = true;
        }

        protected void finalize() throws Throwable {
            if (!this.closed) {
                EZInputStreamAdapter.logger.warn("Closing {} from finalize", this);
                close();
            }
            if (!this.reset) {
                EZInputStreamAdapter.logger.warn("Reset {} from finalize", this);
                reset();
            }
            super.finalize();
        }
    }

    private EZInputStreamAdapter() {
    }

    private static ResetableOutput output() {
        try {
            return new ResetableOutput(Files.createTempFile("ez-is-adapt", ".stream", new FileAttribute[0]));
        } catch (IOException e) {
            throw new AdaptException(e);
        }
    }

    public static <T> CompletableFuture<T> consume(Stream stream, Function<CountingInputStream, T> function) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        try {
            setupStreamHandlers(function, completableFuture, VertxStream.read(stream));
        } catch (Exception e) {
            logger.error("Error setting up stream handlers " + e.getMessage(), e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    private static <T> void setupStreamHandlers(Function<CountingInputStream, T> function, CompletableFuture<T> completableFuture, ReadStream<Buffer> readStream) {
        if (readStream instanceof VertxStream.LocalPathStream) {
            processLocalPath(function, completableFuture, (VertxStream.LocalPathStream) readStream);
        } else if (MemfdSupport.isAvailable()) {
            processWithMemfd(function, completableFuture, readStream);
        } else {
            processWithTemporaryFile(function, completableFuture, readStream);
        }
    }

    private static <T> void processWithTemporaryFile(Function<CountingInputStream, T> function, CompletableFuture<T> completableFuture, ReadStream<Buffer> readStream) {
        ResetableOutput output = output();
        readStream.endHandler(r7 -> {
            output.close();
            try {
                Throwable th = null;
                try {
                    CountingInputStream countingInputStream = new CountingInputStream(output.input());
                    try {
                        completableFuture.complete(function.apply(countingInputStream));
                        if (countingInputStream != null) {
                            countingInputStream.close();
                        }
                    } catch (Throwable th2) {
                        if (countingInputStream != null) {
                            countingInputStream.close();
                        }
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (0 == 0) {
                        th = th3;
                    } else if (null != th3) {
                        th.addSuppressed(th3);
                    }
                    throw th;
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                completableFuture.completeExceptionally(e);
            } finally {
                output.reset();
            }
        });
        readStream.handler(buffer -> {
            try {
                output.write(buffer.getByteBuf().nioBuffer());
            } catch (IOException e) {
                logger.error(e.getMessage());
            }
        });
        readStream.exceptionHandler(th -> {
            logger.error(th.getMessage(), th);
            output.close();
            output.reset();
        });
        logger.debug("resume {}...", readStream);
        readStream.resume();
    }

    private static <T> void processWithMemfd(Function<CountingInputStream, T> function, CompletableFuture<T> completableFuture, ReadStream<Buffer> readStream) {
        MemfdWriteStream memfdWriteStream = new MemfdWriteStream();
        readStream.pipeTo(memfdWriteStream);
        memfdWriteStream.result().whenComplete((BiConsumer) (offHeapTemporaryFile, th) -> {
            try {
                if (th != null) {
                    logger.error(th.getMessage(), th);
                    completableFuture.completeExceptionally(th);
                    return;
                }
                Throwable th = null;
                try {
                    CountingInputStream countingInputStream = new CountingInputStream(offHeapTemporaryFile.openForReading());
                    try {
                        Object apply = function.apply(countingInputStream);
                        logger.debug("MEMFD adapted for {}", Long.valueOf(countingInputStream.getCount()));
                        completableFuture.complete(apply);
                        if (countingInputStream != null) {
                            countingInputStream.close();
                        }
                    } catch (Throwable th2) {
                        if (countingInputStream != null) {
                            countingInputStream.close();
                        }
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (0 == 0) {
                        th = th3;
                    } else if (null != th3) {
                        th.addSuppressed(th3);
                    }
                    throw th;
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                completableFuture.completeExceptionally(th);
            } finally {
                offHeapTemporaryFile.close();
            }
        });
        readStream.resume();
    }

    private static <T> void processLocalPath(Function<CountingInputStream, T> function, CompletableFuture<T> completableFuture, VertxStream.LocalPathStream localPathStream) {
        Throwable th = null;
        try {
            try {
                CountingInputStream countingInputStream = new CountingInputStream(Files.newInputStream(localPathStream.path(), new OpenOption[0]));
                try {
                    completableFuture.complete(function.apply(countingInputStream));
                    if (countingInputStream != null) {
                        countingInputStream.close();
                    }
                } catch (Throwable th2) {
                    if (countingInputStream != null) {
                        countingInputStream.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (0 == 0) {
                    th = th3;
                } else if (null != th3) {
                    th.addSuppressed(th3);
                }
                throw th;
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            completableFuture.completeExceptionally(e);
        }
    }
}
