/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.mysql.source.utils;

import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import io.debezium.data.Envelope;
import io.debezium.document.DocumentReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.util.SchemaNameAdjuster;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.table.types.logical.RowType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

public class RecordUtils {
    public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = "io.debezium.connector.mysql.SchemaChangeKey";
    public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME = "io.debezium.connector.common.Heartbeat";
    private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();

    private RecordUtils() {
    }

    public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
        Object[] row = new Object[size];
        for (int i = 0; i < size; ++i) {
            row[i] = rs.getObject(i + 1);
        }
        return row;
    }

    public static void upsertBinlog(Map<Struct, SourceRecord> snapshotRecords, SourceRecord binlogRecord) {
        Struct key = (Struct)binlogRecord.key();
        Struct value = (Struct)binlogRecord.value();
        if (value != null) {
            Envelope.Operation operation = Envelope.Operation.forCode((String)value.getString("op"));
            switch (operation) {
                case CREATE: 
                case UPDATE: {
                    Envelope envelope = Envelope.fromSchema((Schema)binlogRecord.valueSchema());
                    Struct source = value.getStruct("source");
                    Struct after = value.getStruct("after");
                    Instant fetchTs = Instant.ofEpochMilli((Long)source.get("ts_ms"));
                    SourceRecord record = new SourceRecord(binlogRecord.sourcePartition(), binlogRecord.sourceOffset(), binlogRecord.topic(), binlogRecord.kafkaPartition(), binlogRecord.keySchema(), binlogRecord.key(), binlogRecord.valueSchema(), (Object)envelope.read((Object)after, source, fetchTs));
                    snapshotRecords.put(key, record);
                    break;
                }
                case DELETE: {
                    snapshotRecords.remove(key);
                    break;
                }
                case READ: {
                    throw new IllegalStateException(String.format("Binlog record shouldn't use READ operation, the the record is %s.", binlogRecord));
                }
            }
        }
    }

    public static List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords) {
        return snapshotRecords.stream().map(record -> {
            Envelope envelope = Envelope.fromSchema((Schema)record.valueSchema());
            Struct value = (Struct)record.value();
            Struct updateAfter = value.getStruct("after");
            Struct source = value.getStruct("source");
            source.put("ts_ms", (Object)0L);
            Instant fetchTs = Instant.ofEpochMilli(value.getInt64("ts_ms"));
            SourceRecord sourceRecord = new SourceRecord(record.sourcePartition(), record.sourceOffset(), record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), (Object)envelope.read((Object)updateAfter, source, fetchTs));
            return sourceRecord;
        }).collect(Collectors.toList());
    }

    public static boolean isWatermarkEvent(SourceRecord record) {
        Optional<SignalEventDispatcher.WatermarkKind> watermarkKind = RecordUtils.getWatermarkKind(record);
        return watermarkKind.isPresent();
    }

    public static boolean isLowWatermarkEvent(SourceRecord record) {
        Optional<SignalEventDispatcher.WatermarkKind> watermarkKind = RecordUtils.getWatermarkKind(record);
        return watermarkKind.isPresent() && watermarkKind.get() == SignalEventDispatcher.WatermarkKind.LOW;
    }

    public static boolean isHighWatermarkEvent(SourceRecord record) {
        Optional<SignalEventDispatcher.WatermarkKind> watermarkKind = RecordUtils.getWatermarkKind(record);
        return watermarkKind.isPresent() && watermarkKind.get() == SignalEventDispatcher.WatermarkKind.HIGH;
    }

    public static boolean isEndWatermarkEvent(SourceRecord record) {
        Optional<SignalEventDispatcher.WatermarkKind> watermarkKind = RecordUtils.getWatermarkKind(record);
        return watermarkKind.isPresent() && watermarkKind.get() == SignalEventDispatcher.WatermarkKind.BINLOG_END;
    }

    public static BinlogOffset getWatermark(SourceRecord watermarkEvent) {
        return RecordUtils.getBinlogPosition(watermarkEvent.sourceOffset());
    }

    public static Long getMessageTimestamp(SourceRecord record) {
        Schema schema = record.valueSchema();
        Struct value = (Struct)record.value();
        if (schema.field("source") == null) {
            return null;
        }
        Struct source = value.getStruct("source");
        if (source.schema().field("ts_ms") == null) {
            return null;
        }
        return source.getInt64("ts_ms");
    }

    public static Long getFetchTimestamp(SourceRecord record) {
        Schema schema = record.valueSchema();
        Struct value = (Struct)record.value();
        if (schema.field("ts_ms") == null) {
            return null;
        }
        return value.getInt64("ts_ms");
    }

    public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
        Schema keySchema = sourceRecord.keySchema();
        return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
    }

    public static boolean isHeartbeatEvent(SourceRecord record) {
        Schema valueSchema = record.valueSchema();
        return valueSchema != null && SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
    }

    public static FinishedSnapshotSplitInfo getSnapshotSplitInfo(MySqlSnapshotSplit split, SourceRecord highWatermark) {
        Struct value = (Struct)highWatermark.value();
        String splitId = value.getString("split_id");
        return new FinishedSnapshotSplitInfo(split.getTableId(), splitId, split.getSplitStart(), split.getSplitEnd(), RecordUtils.getBinlogPosition(highWatermark.sourceOffset()));
    }

    public static BinlogOffset getStartingOffsetOfBinlogSplit(List<FinishedSnapshotSplitInfo> finishedSnapshotSplits) {
        BinlogOffset startOffset = finishedSnapshotSplits.isEmpty() ? BinlogOffset.ofEarliest() : finishedSnapshotSplits.get(0).getHighWatermark();
        for (FinishedSnapshotSplitInfo finishedSnapshotSplit : finishedSnapshotSplits) {
            if (!finishedSnapshotSplit.getHighWatermark().isBefore(startOffset)) continue;
            startOffset = finishedSnapshotSplit.getHighWatermark();
        }
        return startOffset;
    }

    public static boolean isDataChangeRecord(SourceRecord record) {
        Schema valueSchema = record.valueSchema();
        Struct value = (Struct)record.value();
        return valueSchema.field("op") != null && value.getString("op") != null;
    }

    public static TableId getTableId(SourceRecord dataRecord) {
        Struct value = (Struct)dataRecord.value();
        Struct source = value.getStruct("source");
        String dbName = source.getString("db");
        String tableName = source.getString("table");
        return new TableId(dbName, null, tableName);
    }

    public static Object[] getSplitKey(RowType splitBoundaryType, SourceRecord dataRecord, SchemaNameAdjuster nameAdjuster) {
        String splitFieldName = nameAdjuster.adjust((String)splitBoundaryType.getFieldNames().get(0));
        Struct key = (Struct)dataRecord.key();
        return new Object[]{key.get(splitFieldName)};
    }

    public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
        return RecordUtils.getBinlogPosition(dataRecord.sourceOffset());
    }

    public static BinlogOffset getBinlogPosition(Map<String, ?> offset) {
        HashMap<String, String> offsetStrMap = new HashMap<String, String>();
        for (Map.Entry<String, ?> entry : offset.entrySet()) {
            offsetStrMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        return BinlogOffset.builder().setOffsetMap(offsetStrMap).build();
    }

    public static boolean splitKeyRangeContains(Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
        if (splitKeyStart == null && splitKeyEnd == null) {
            return true;
        }
        if (splitKeyStart == null) {
            int[] upperBoundRes = new int[key.length];
            for (int i = 0; i < key.length; ++i) {
                upperBoundRes[i] = RecordUtils.compareObjects(key[i], splitKeyEnd[i]);
            }
            return Arrays.stream(upperBoundRes).anyMatch(value -> value < 0) && Arrays.stream(upperBoundRes).allMatch(value -> value <= 0);
        }
        if (splitKeyEnd == null) {
            int[] lowerBoundRes = new int[key.length];
            for (int i = 0; i < key.length; ++i) {
                lowerBoundRes[i] = RecordUtils.compareObjects(key[i], splitKeyStart[i]);
            }
            return Arrays.stream(lowerBoundRes).allMatch(value -> value >= 0);
        }
        int[] lowerBoundRes = new int[key.length];
        int[] upperBoundRes = new int[key.length];
        for (int i = 0; i < key.length; ++i) {
            lowerBoundRes[i] = RecordUtils.compareObjects(key[i], splitKeyStart[i]);
            upperBoundRes[i] = RecordUtils.compareObjects(key[i], splitKeyEnd[i]);
        }
        return Arrays.stream(lowerBoundRes).anyMatch(value -> value >= 0) && Arrays.stream(upperBoundRes).anyMatch(value -> value < 0) && Arrays.stream(upperBoundRes).allMatch(value -> value <= 0);
    }

    private static int compareObjects(Object o1, Object o2) {
        if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
            return ((Comparable)o1).compareTo(o2);
        }
        if (RecordUtils.isNumericObject(o1) && RecordUtils.isNumericObject(o2)) {
            return RecordUtils.toBigDecimal(o1).compareTo(RecordUtils.toBigDecimal(o2));
        }
        return o1.toString().compareTo(o2.toString());
    }

    private static boolean isNumericObject(Object obj) {
        return obj instanceof Byte || obj instanceof Short || obj instanceof Integer || obj instanceof Long || obj instanceof Float || obj instanceof Double || obj instanceof BigInteger || obj instanceof BigDecimal;
    }

    private static BigDecimal toBigDecimal(Object numericObj) {
        return new BigDecimal(numericObj.toString());
    }

    public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) throws IOException {
        Struct value = (Struct)schemaRecord.value();
        String historyRecordStr = value.getString("historyRecord");
        return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr));
    }

    private static Optional<SignalEventDispatcher.WatermarkKind> getWatermarkKind(SourceRecord record) {
        if (record.valueSchema() != null && "io.debezium.connector.flink.cdc.embedded.watermark.value".equals(record.valueSchema().name())) {
            Struct value = (Struct)record.value();
            return Optional.of(SignalEventDispatcher.WatermarkKind.valueOf(value.getString("watermark_kind")));
        }
        return Optional.empty();
    }
}

