/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.mysql.debezium.task;

import com.github.shyiko.mysql.binlog.event.Event;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSource;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlBinlogSplitReadTask
extends MySqlStreamingChangeEventSource {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class);
    private final MySqlBinlogSplit binlogSplit;
    private final MySqlOffsetContext offsetContext;
    private final EventDispatcherImpl<TableId> eventDispatcher;
    private final SignalEventDispatcher signalEventDispatcher;
    private final ErrorHandler errorHandler;
    private ChangeEventSource.ChangeEventSourceContext context;

    public MySqlBinlogSplitReadTask(MySqlConnectorConfig connectorConfig, MySqlOffsetContext offsetContext, MySqlConnection connection, EventDispatcherImpl<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics, String topic, MySqlBinlogSplit binlogSplit) {
        super(connectorConfig, offsetContext, connection, dispatcher, errorHandler, clock, taskContext, metrics);
        this.binlogSplit = binlogSplit;
        this.eventDispatcher = dispatcher;
        this.offsetContext = offsetContext;
        this.errorHandler = errorHandler;
        this.signalEventDispatcher = new SignalEventDispatcher(offsetContext.getPartition(), topic, this.eventDispatcher.getQueue());
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        this.context = context;
        super.execute(context);
    }

    protected void handleEvent(Event event) {
        BinlogOffset currentBinlogOffset;
        super.handleEvent(event);
        if (this.isBoundedRead() && (currentBinlogOffset = RecordUtils.getBinlogPosition(this.offsetContext.getOffset())).isAtOrAfter(this.binlogSplit.getEndingOffset())) {
            try {
                this.signalEventDispatcher.dispatchWatermarkEvent(this.binlogSplit, currentBinlogOffset, SignalEventDispatcher.WatermarkKind.BINLOG_END);
            }
            catch (InterruptedException e) {
                LOG.error("Send signal event error.", (Throwable)e);
                this.errorHandler.setProducerThrowable((Throwable)new DebeziumException("Error processing binlog signal event", (Throwable)e));
            }
            ((SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl)this.context).finished();
        }
    }

    private boolean isBoundedRead() {
        return !BinlogOffset.NO_STOPPING_OFFSET.equals(this.binlogSplit.getEndingOffset());
    }
}

