/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.mirror.MirrorCheckpointConfig;
import org.apache.kafka.connect.mirror.MirrorUtils;
import org.apache.kafka.connect.mirror.OffsetSync;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetSyncStore
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(OffsetSyncStore.class);
    static final int SYNCS_PER_PARTITION = 64;
    private final KafkaBasedLog<byte[], byte[]> backingStore;
    private final Map<TopicPartition, OffsetSync[]> offsetSyncs = new ConcurrentHashMap<TopicPartition, OffsetSync[]>();
    private final TopicAdmin admin;
    protected volatile boolean initializationMustReadToEnd = true;
    protected volatile boolean readToEnd = false;

    OffsetSyncStore(MirrorCheckpointConfig config) {
        KafkaBasedLog<byte[], byte[]> store;
        KafkaConsumer<byte[], byte[]> consumer = null;
        TopicAdmin admin = null;
        try {
            consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
            admin = new TopicAdmin(config.offsetSyncsTopicAdminConfig(), (Admin)config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
            store = this.createBackingStore(config, (Consumer<byte[], byte[]>)consumer, admin);
        }
        catch (Throwable t) {
            Utils.closeQuietly(consumer, (String)"consumer for offset syncs");
            Utils.closeQuietly(admin, (String)"admin client for offset syncs");
            throw t;
        }
        this.admin = admin;
        this.backingStore = store;
    }

    private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) {
        return KafkaBasedLog.withExistingClients((String)config.offsetSyncsTopic(), consumer, null, (TopicAdmin)admin, (error, record) -> this.handleRecord((ConsumerRecord<byte[], byte[]>)record), (Time)Time.SYSTEM, ignored -> {}, topicPartition -> topicPartition.partition() == 0);
    }

    OffsetSyncStore() {
        this.admin = null;
        this.backingStore = null;
    }

    public void start(boolean initializationMustReadToEnd) {
        this.initializationMustReadToEnd = initializationMustReadToEnd;
        log.debug("OffsetSyncStore starting - must read to OffsetSync end = {}", (Object)initializationMustReadToEnd);
        this.backingStoreStart();
        this.readToEnd = true;
    }

    void backingStoreStart() {
        this.backingStore.start(false);
    }

    public OptionalLong translateDownstream(String group, TopicPartition sourceTopicPartition, long upstreamOffset) {
        if (!this.readToEnd) {
            log.debug("translateDownstream({},{},{}): Skipped (initial offset syncs read still in progress)", new Object[]{group, sourceTopicPartition, upstreamOffset});
            return OptionalLong.empty();
        }
        Optional<OffsetSync> offsetSync = this.latestOffsetSync(sourceTopicPartition, upstreamOffset);
        if (offsetSync.isPresent()) {
            if (offsetSync.get().upstreamOffset() > upstreamOffset) {
                log.debug("translateDownstream({},{},{}): Skipped ({} is ahead of upstream consumer group {})", new Object[]{group, sourceTopicPartition, upstreamOffset, offsetSync.get(), upstreamOffset});
                return OptionalLong.of(-1L);
            }
            long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0L : 1L;
            log.debug("translateDownstream({},{},{}): Translated {} (relative to {})", new Object[]{group, sourceTopicPartition, upstreamOffset, offsetSync.get().downstreamOffset() + upstreamStep, offsetSync.get()});
            return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
        }
        log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)", new Object[]{group, sourceTopicPartition, upstreamOffset});
        return OptionalLong.empty();
    }

    @Override
    public void close() {
        Utils.closeQuietly(this.backingStore != null ? () -> this.backingStore.stop() : null, (String)"backing store for offset syncs");
        Utils.closeQuietly((AutoCloseable)this.admin, (String)"admin client for offset syncs");
    }

    protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
        OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
        TopicPartition sourceTopicPartition = offsetSync.topicPartition();
        this.offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> syncs == null ? this.createInitialSyncs(offsetSync) : this.updateExistingSyncs((OffsetSync[])syncs, offsetSync));
    }

    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, 64);
        this.updateSyncArray(mutableSyncs, syncs, offsetSync);
        if (log.isTraceEnabled()) {
            log.trace("New sync {} applied, new state is {}", (Object)offsetSync, (Object)this.offsetArrayToString(mutableSyncs));
        }
        return mutableSyncs;
    }

    private String offsetArrayToString(OffsetSync[] syncs) {
        StringBuilder stateString = new StringBuilder();
        stateString.append("[");
        for (int i = 0; i < 64; ++i) {
            if (i != 0 && syncs[i] == syncs[i - 1]) continue;
            if (i != 0) {
                stateString.append(",");
            }
            stateString.append(syncs[i].upstreamOffset());
            stateString.append(":");
            stateString.append(syncs[i].downstreamOffset());
        }
        stateString.append("]");
        return stateString.toString();
    }

    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
        OffsetSync[] syncs = new OffsetSync[64];
        this.clearSyncArray(syncs, firstSync);
        return syncs;
    }

    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
        for (int i = 0; i < 64; ++i) {
            syncs[i] = offsetSync;
        }
    }

    private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, OffsetSync offsetSync) {
        boolean upstreamRewind;
        long upstreamOffset = offsetSync.upstreamOffset();
        boolean onlyLoadLastOffset = !this.readToEnd && this.initializationMustReadToEnd;
        boolean bl = upstreamRewind = upstreamOffset < syncs[0].upstreamOffset();
        if (onlyLoadLastOffset || upstreamRewind) {
            this.clearSyncArray(syncs, offsetSync);
            return;
        }
        OffsetSync replacement = offsetSync;
        int replacementIndex = 0;
        syncs[0] = replacement;
        for (int current = 1; current < 64; ++current) {
            boolean skipOldValue;
            int previous = current - 1;
            do {
                boolean promoteOldValueToReplacement;
                OffsetSync oldValue = original[replacementIndex];
                boolean isRecent = this.invariantB(syncs[previous], oldValue, previous, current);
                boolean separatedFromPrevious = this.invariantC(syncs[previous], oldValue, previous);
                int next = current + 1;
                boolean separatedFromNext = next >= 64 || this.invariantC(oldValue, syncs[next], current);
                int nextReplacement = replacementIndex + 1;
                boolean duplicate = nextReplacement < 64 && oldValue == original[nextReplacement];
                boolean bl2 = promoteOldValueToReplacement = isRecent && separatedFromPrevious && separatedFromNext;
                if (promoteOldValueToReplacement) {
                    replacement = oldValue;
                }
                boolean bl3 = skipOldValue = duplicate || !separatedFromPrevious;
                if (!promoteOldValueToReplacement && !skipOldValue) continue;
                ++replacementIndex;
            } while (replacementIndex < current && skipOldValue);
            assert (this.invariantB(syncs[previous], replacement, previous, current));
            assert (this.invariantC(syncs[previous], replacement, previous));
            if (this.invariantB(syncs[previous], syncs[current], previous, current)) break;
            syncs[current] = replacement;
            assert (this.invariantB(syncs[previous], syncs[current], previous, current));
            assert (this.invariantC(syncs[previous], syncs[current], previous));
        }
    }

    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int j) {
        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
        return iSync == jSync || bound < 0L || iSync.upstreamOffset() <= bound;
    }

    private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i) {
        long bound = jSync.upstreamOffset() + (1L << Math.max(i - 2, 0));
        return iSync == jSync || bound >= 0L && iSync.upstreamOffset() >= bound;
    }

    private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition, long upstreamOffset) {
        return Optional.ofNullable(this.offsetSyncs.get(topicPartition)).map(syncs -> this.lookupLatestSync((OffsetSync[])syncs, upstreamOffset));
    }

    private OffsetSync lookupLatestSync(OffsetSync[] syncs, long upstreamOffset) {
        for (int i = 0; i < 64; ++i) {
            OffsetSync offsetSync = syncs[i];
            if (offsetSync.upstreamOffset() > upstreamOffset) continue;
            return offsetSync;
        }
        return syncs[63];
    }

    OffsetSync syncFor(TopicPartition topicPartition, int syncIdx) {
        OffsetSync[] syncs = this.offsetSyncs.get(topicPartition);
        if (syncs == null) {
            throw new IllegalArgumentException("No syncs present for " + String.valueOf(topicPartition));
        }
        if (syncIdx >= syncs.length) {
            throw new IllegalArgumentException("Requested sync " + (syncIdx + 1) + " for " + String.valueOf(topicPartition) + " but there are only " + syncs.length + " syncs available for that topic partition");
        }
        return syncs[syncIdx];
    }
}

