package net.bluemind.cli.mail;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchAllQuery;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.ScrollResponse;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import net.bluemind.cli.cmd.api.CliContext;
import net.bluemind.cli.cmd.api.ICmdLet;
import net.bluemind.cli.cmd.api.ICmdLetRegistration;
import net.bluemind.lib.elasticsearch.ESearchActivator;
import net.bluemind.lib.elasticsearch.EsBulk;
import net.bluemind.lib.elasticsearch.exception.ElasticBulkException;
import net.bluemind.system.api.IInstallation;
import net.bluemind.system.api.PublicInfos;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.utils.IOUtils;
import picocli.CommandLine;

@CommandLine.Command(name = "indexreplicated", description = {"Index pre-replicated messages"})
/* loaded from: input_file:net/bluemind/cli/mail/IndexPrereplicatedMailsCommand.class */
public class IndexPrereplicatedMailsCommand implements ICmdLet, Runnable {
    private CliContext ctx;
    private static final String TAR = "/var/spool/bm-replication/bodies.replicated.tgz";
    private static final String INDEX_PENDING_READ_ALIAS = "mailspool_pending_read_alias";
    private static final String INDEX_PENDING_WRITE_ALIAS = "mailspool_pending_write_alias";

    @CommandLine.Option(required = false, names = {"--progress"}, description = {"Value indicating the total mails waiting to be indexed"})
    public Long progress;

    /* loaded from: input_file:net/bluemind/cli/mail/IndexPrereplicatedMailsCommand$Reg.class */
    public static class Reg implements ICmdLetRegistration {
        public Optional<String> group() {
            return Optional.of("mail");
        }

        public Class<? extends ICmdLet> commandClass() {
            return IndexPrereplicatedMailsCommand.class;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        PublicInfos infos = ((IInstallation) CliContext.get().adminApi().instance(IInstallation.class, new String[0])).getInfos();
        this.ctx.info("infos: " + infos.softwareVersion + " " + infos.releaseName);
        File file = new File(TAR);
        if (!file.exists()) {
            this.ctx.info("File /var/spool/bm-replication/bodies.replicated.tgz not found");
            System.exit(0);
        }
        try {
            extractAndIndex(file);
        } catch (Exception e) {
            this.ctx.warn("Cannot extract and index mails:\r\n" + this.ctx.toStack(e));
        }
    }

    private void extractAndIndex(File file) throws IOException {
        ArrayList arrayList = new ArrayList();
        AtomicLong atomicLong = new AtomicLong();
        ElasticsearchClient client = ESearchActivator.getClient();
        Consumer<IndexedMessageBody> consumer = indexedMessageBody -> {
            arrayList.add(indexedMessageBody);
            if (arrayList.size() == 100) {
                this.ctx.info("Indexed " + atomicLong.addAndGet(100L) + " mailbodies");
                index(client, arrayList);
                arrayList.clear();
            }
        };
        Set<String> indexedUids = getIndexedUids(client);
        this.ctx.info(indexedUids.size() + " mails have already been indexed.");
        Throwable th = null;
        try {
            InputStream newInputStream = Files.newInputStream(file.toPath(), new OpenOption[0]);
            try {
                BufferedInputStream bufferedInputStream = new BufferedInputStream(newInputStream);
                try {
                    GzipCompressorInputStream gzipCompressorInputStream = new GzipCompressorInputStream(bufferedInputStream);
                    try {
                        TarArchiveInputStream tarArchiveInputStream = new TarArchiveInputStream(gzipCompressorInputStream);
                        try {
                            extract(tarArchiveInputStream, consumer, indexedUids);
                            if (tarArchiveInputStream != null) {
                                tarArchiveInputStream.close();
                            }
                            if (gzipCompressorInputStream != null) {
                                gzipCompressorInputStream.close();
                            }
                            if (bufferedInputStream != null) {
                                bufferedInputStream.close();
                            }
                            if (newInputStream != null) {
                                newInputStream.close();
                            }
                            this.ctx.info("Indexed " + atomicLong.addAndGet(arrayList.size()) + " mailbodies");
                            index(client, arrayList);
                        } catch (Throwable th2) {
                            if (tarArchiveInputStream != null) {
                                tarArchiveInputStream.close();
                            }
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (0 == 0) {
                            th = th3;
                        } else if (null != th3) {
                            th.addSuppressed(th3);
                        }
                        if (gzipCompressorInputStream != null) {
                            gzipCompressorInputStream.close();
                        }
                        throw th;
                    }
                } catch (Throwable th4) {
                    if (0 == 0) {
                        th = th4;
                    } else if (null != th4) {
                        th.addSuppressed(th4);
                    }
                    if (bufferedInputStream != null) {
                        bufferedInputStream.close();
                    }
                    throw th;
                }
            } catch (Throwable th5) {
                if (0 == 0) {
                    th = th5;
                } else if (null != th5) {
                    th.addSuppressed(th5);
                }
                if (newInputStream != null) {
                    newInputStream.close();
                }
                throw th;
            }
        } catch (Throwable th6) {
            if (0 == 0) {
                th = th6;
            } else if (null != th6) {
                th.addSuppressed(th6);
            }
            throw th;
        }
    }

    private void extract(ArchiveInputStream archiveInputStream, Consumer<IndexedMessageBody> consumer, Set<String> set) throws IOException {
        while (true) {
            ArchiveEntry nextEntry = archiveInputStream.getNextEntry();
            if (nextEntry == null) {
                return;
            }
            if (!set.contains(uidFromFileName(nextEntry.getName()))) {
                try {
                    consumer.accept(IndexedMessageBody.fromJson(new String(IOUtils.toByteArray(archiveInputStream), StandardCharsets.UTF_8)));
                } catch (Exception e) {
                    this.ctx.info("Cannot handle file {} --> {}", new Object[]{nextEntry.getName(), e.getMessage()});
                }
            }
        }
    }

    private String uidFromFileName(String str) {
        return str.substring(0, str.indexOf(46));
    }

    private Set<String> getIndexedUids(ElasticsearchClient elasticsearchClient) throws ElasticsearchException, IOException {
        HashSet hashSet = new HashSet();
        ScrollResponse search = elasticsearchClient.search(builder -> {
            return builder.index(INDEX_PENDING_READ_ALIAS, new String[0]).query(MatchAllQuery.of(builder -> {
                return builder;
            })._toQuery()).source(builder2 -> {
                return builder2.fetch(false);
            }).scroll(builder3 -> {
                return builder3.time("180s");
            }).size(10000);
        }, Void.class);
        long j = 0;
        while (j < search.hits().total().value()) {
            search.hits().hits().stream().forEach(hit -> {
                hashSet.add(hit.id());
            });
            j += search.hits().hits().size();
            String scrollId = search.scrollId();
            if (j < search.hits().total().value()) {
                search = elasticsearchClient.scroll(builder2 -> {
                    return builder2.scrollId(scrollId).scroll(builder2 -> {
                        return builder2.time("180s");
                    });
                }, Void.class);
            }
        }
        String scrollId2 = search.scrollId();
        elasticsearchClient.clearScroll(builder3 -> {
            return builder3.scrollId(scrollId2, new String[0]);
        });
        return hashSet;
    }

    private void index(ElasticsearchClient elasticsearchClient, List<IndexedMessageBody> list) {
        try {
            new EsBulk(elasticsearchClient).commitAll(list, (indexedMessageBody, builder) -> {
                return builder.index(builder -> {
                    return builder.index(INDEX_PENDING_WRITE_ALIAS).id(indexedMessageBody.uid).document(indexedMessageBody.toMap());
                });
            }).ifPresent(this::reportErrors);
        } catch (ElasticBulkException e) {
            this.ctx.error("Bulk request fails on index: {} while indexing {} bodies", new Object[]{INDEX_PENDING_WRITE_ALIAS, Integer.valueOf(list.size()), e});
        }
    }

    private void reportErrors(BulkResponse bulkResponse) {
        if (bulkResponse.errors()) {
            bulkResponse.items().stream().filter(bulkResponseItem -> {
                return bulkResponseItem.error() != null;
            }).toList().forEach(bulkResponseItem2 -> {
                this.ctx.error("Bulk request failed on item id:{} error:{} stack:{}", new Object[]{bulkResponseItem2.id(), bulkResponseItem2.error().type(), bulkResponseItem2.error().stackTrace()});
            });
        }
    }

    public Runnable forContext(CliContext cliContext) {
        this.ctx = cliContext;
        return this;
    }
}
