package net.bluemind.core.backup.continuous.mgmt.service.tests;

import io.netty.util.AsciiString;
import java.io.ByteArrayInputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import net.bluemind.backend.cyrus.replication.testhelper.CyrusReplicationHelper;
import net.bluemind.backend.cyrus.replication.testhelper.ExpectCommand;
import net.bluemind.backend.cyrus.replication.testhelper.SyncServerHelper;
import net.bluemind.config.Token;
import net.bluemind.core.backup.continuous.DefaultBackupStore;
import net.bluemind.core.backup.continuous.ILiveStream;
import net.bluemind.core.backup.continuous.api.InstallationWriteLeader;
import net.bluemind.core.backup.continuous.leader.DefaultLeader;
import net.bluemind.core.backup.continuous.mgmt.api.BackupSyncOptions;
import net.bluemind.core.backup.continuous.mgmt.api.IContinuousBackupMgmt;
import net.bluemind.core.backup.continuous.store.TopicSubscriber;
import net.bluemind.core.backup.store.kafka.KafkaTopicStore;
import net.bluemind.core.context.SecurityContext;
import net.bluemind.core.elasticsearch.ElasticsearchTestHelper;
import net.bluemind.core.jdbc.JdbcTestHelper;
import net.bluemind.core.rest.ServerSideServiceProvider;
import net.bluemind.core.rest.http.ClientSideServiceProvider;
import net.bluemind.core.task.api.TaskRef;
import net.bluemind.core.task.service.TaskUtils;
import net.bluemind.group.api.IGroup;
import net.bluemind.group.api.Member;
import net.bluemind.imap.FlagsList;
import net.bluemind.imap.ListInfo;
import net.bluemind.imap.StoreClient;
import net.bluemind.kafka.container.ZkKafkaContainer;
import net.bluemind.lib.vertx.VertxPlatform;
import net.bluemind.mailbox.api.Mailbox;
import net.bluemind.pool.impl.BmConfIni;
import net.bluemind.server.api.Server;
import net.bluemind.tests.defaultdata.PopulateHelper;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:net/bluemind/core/backup/continuous/mgmt/service/tests/SyncToKafkaStoreTests.class */
public class SyncToKafkaStoreTests {
    private ZkKafkaContainer kafka;
    private String cyrusIp;
    private CyrusReplicationHelper cyrusReplication;

    @Before
    public void before() throws Exception {
        this.kafka = new ZkKafkaContainer();
        this.kafka.start();
        String inspectAddress = this.kafka.inspectAddress();
        System.setProperty("bm.kafka.bootstrap.servers", String.valueOf(inspectAddress) + ":9093");
        System.setProperty("bm.zk.servers", String.valueOf(inspectAddress) + ":2181");
        DefaultLeader.reset();
        JdbcTestHelper.getInstance().beforeTest();
        VertxPlatform.spawnBlocking(10L, TimeUnit.SECONDS);
        ConditionFactory atMost = Awaitility.await().atMost(20L, TimeUnit.SECONDS);
        InstallationWriteLeader leader = DefaultLeader.leader();
        leader.getClass();
        atMost.until(leader::isLeader);
        BmConfIni bmConfIni = new BmConfIni();
        Server server = new Server();
        server.ip = ElasticsearchTestHelper.getInstance().getHost();
        System.out.println("ES is " + server.ip);
        Assert.assertNotNull(server.ip);
        server.tags = Collections.singletonList("bm/es");
        this.cyrusIp = bmConfIni.get("imap-role");
        Server server2 = new Server();
        server2.ip = this.cyrusIp;
        server2.tags = Collections.singletonList("mail/imap");
        PopulateHelper.initGlobalVirt(new Server[]{server, server2});
        ElasticsearchTestHelper.getInstance().beforeTest();
        System.err.println("Add global.virt");
        PopulateHelper.addDomainAdmin("admin0", "global.virt", Mailbox.Routing.none);
        System.err.println("Pause store then init domain and john01");
        DefaultBackupStore.store().pause();
        System.err.println("Add devenv.blue");
        PopulateHelper.addDomain("devenv.blue", Mailbox.Routing.none, new String[]{"devenv.bm"});
        System.err.println("devenv.blue added.");
        this.cyrusReplication = new CyrusReplicationHelper(this.cyrusIp);
        System.err.println("Replication setup starts.");
        this.cyrusReplication.installReplication();
        SyncServerHelper.waitFor();
        this.cyrusReplication.startReplication().get(5L, TimeUnit.SECONDS);
        ExpectCommand expectCommand = new ExpectCommand();
        String addUser = PopulateHelper.addUser("john01", "devenv.blue", Mailbox.Routing.internal, new String[0]);
        System.err.println("Add john01 returned.");
        fillMailbox(expectCommand, "john01@devenv.blue");
        String addGroup = PopulateHelper.addGroup("devenv.blue", "brotherhood", "confrérie", Collections.singletonList(Member.user(addUser)));
        IGroup iGroup = (IGroup) ServerSideServiceProvider.getProvider(SecurityContext.SYSTEM).instance(IGroup.class, new String[]{"devenv.blue"});
        System.err.println("group is " + iGroup.getComplete(addGroup));
        System.err.println("Resume store then create jane01");
        DefaultBackupStore.store().resume();
        String addUser2 = PopulateHelper.addUser("jane01", "devenv.blue", Mailbox.Routing.internal, new String[0]);
        System.err.println("Add jane01 returned.");
        fillMailbox(expectCommand, "jane01@devenv.blue");
        System.err.println("Add jane the group");
        iGroup.add(addGroup, Collections.singletonList(Member.user(addUser2)));
    }

    /* JADX WARN: Finally extract failed */
    private void fillMailbox(ExpectCommand expectCommand, String str) throws InterruptedException, ExecutionException, TimeoutException {
        Throwable th = null;
        try {
            StoreClient storeClient = new StoreClient(this.cyrusIp, 1143, str, "yeah");
            try {
                Assert.assertTrue(storeClient.login());
                Iterator it = storeClient.listAll().iterator();
                while (it.hasNext()) {
                    ListInfo listInfo = (ListInfo) it.next();
                    if (listInfo.isSelectable()) {
                        String str2 = "From: wick" + UUID.randomUUID().toString() + "@gmail.com\r\nSubject: yeah " + UUID.randomUUID().toString() + "\r\n\r\n";
                        CompletableFuture onNextApplyMessage = expectCommand.onNextApplyMessage();
                        Assert.assertTrue(storeClient.append(listInfo.getName(), new ByteArrayInputStream(str2.getBytes()), new FlagsList()) > 0);
                        onNextApplyMessage.get(2L, TimeUnit.SECONDS);
                    }
                }
                if (storeClient != null) {
                    storeClient.close();
                }
            } catch (Throwable th2) {
                if (storeClient != null) {
                    storeClient.close();
                }
                throw th2;
            }
        } catch (Throwable th3) {
            if (0 == 0) {
                th = th3;
            } else if (null != th3) {
                th.addSuppressed(th3);
            }
            throw th;
        }
    }

    @After
    public void after() throws Exception {
        System.err.println("after starts.");
        DefaultLeader.leader().releaseLeadership();
        this.kafka.stop();
        JdbcTestHelper.getInstance().afterTest();
    }

    @Test
    public void testTaskRef() {
        ClientSideServiceProvider provider = ClientSideServiceProvider.getProvider("http://127.0.0.1:8090", Token.admin0());
        IContinuousBackupMgmt iContinuousBackupMgmt = (IContinuousBackupMgmt) provider.instance(IContinuousBackupMgmt.class, new String[0]);
        System.err.println("***** Starting sync process *****");
        TaskRef syncWithStore = iContinuousBackupMgmt.syncWithStore(new BackupSyncOptions());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        System.err.println("status: " + TaskUtils.wait(provider, syncWithStore, str -> {
            if (str != null) {
                System.err.println(str);
                if (str.contains("continuous.NoopStore")) {
                    atomicBoolean.set(true);
                }
            }
        }));
        Assert.assertFalse("NoopStore should not be used, we want kafka stuff.", atomicBoolean.get());
        KafkaTopicStore kafkaTopicStore = new KafkaTopicStore();
        TopicSubscriber subscriber = kafkaTopicStore.getSubscriber("syncnoid-__orphans__");
        LongAdder longAdder = new LongAdder();
        subscriber.subscribe((bArr, bArr2, i, j) -> {
            System.err.println(" * " + new AsciiString(bArr));
            longAdder.increment();
        });
        Assert.assertTrue("We expected records in sync orphans topic", longAdder.sum() > 0);
        TopicSubscriber subscriber2 = kafkaTopicStore.getSubscriber("syncnoid-devenv.blue");
        LongAdder longAdder2 = new LongAdder();
        subscriber2.subscribe((bArr3, bArr4, i2, j2) -> {
            System.err.println(" * " + new AsciiString(bArr3));
            longAdder2.increment();
        });
        Assert.assertTrue("We expected records in sync devenv.blue topic", longAdder2.sum() > 0);
        Iterator it = DefaultBackupStore.reader().forInstallation("syncnoid").listAvailable().iterator();
        while (it.hasNext()) {
            System.err.println(" * reader has " + ((ILiveStream) it.next()));
        }
        Assert.assertEquals(2L, r0.size());
    }
}
