/*
 * Decompiled with CFR 0.152.
 */
package com.bringspring.flink.handler.service.impl;

import com.bringspring.common.util.StringUtils;
import com.bringspring.daap.capture.model.config.CaptureConfigModel;
import com.bringspring.flink.handler.service.FlinkHandleService;
import com.bringspring.flink.handler.service.environment.FlinkEnvironmentHandle;
import com.bringspring.flink.trans.SqlType;
import com.bringspring.flink.utils.FlinkSqlUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import java.util.Optional;
import java.util.Properties;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.util.FlinkException;
import org.springframework.stereotype.Service;

@Service
public class FlinkHandleServiceImpl
implements FlinkHandleService {
    @Override
    public String check(String jobSource) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironmentImpl ste = (StreamTableEnvironmentImpl)StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)settings);
        Planner planner = ste.getPlanner();
        Parser parser = ste.getPlanner().getParser();
        SqlParser sqlParser = SqlParser.create((String)jobSource, (SqlParser.Config)SqlParser.config().withParserFactory(FlinkSqlParserImpl.FACTORY).withQuoting(Quoting.BACK_TICK).withUnquotedCasing(Casing.TO_LOWER).withQuotedCasing(Casing.UNCHANGED).withConformance((SqlConformance)FlinkSqlConformance.DEFAULT));
        String errInfo = "\u9a8c\u8bc1\u6210\u529f";
        try {
            SqlNodeList sqlNodeList = sqlParser.parseStmtList();
            return sqlNodeList;
        }
        catch (Exception e) {
            e.printStackTrace();
            System.err.println(e.toString());
            errInfo = "\u51fa\u73b0\u8bed\u6cd5\u9519\u8bef\uff1a" + e.toString().split(": ")[1].split("Was")[0];
            errInfo = e.toString().split(": ").length > 2 ? errInfo + " : " + e.toString().split(": ")[2] : errInfo;
            throw new Exception(errInfo);
        }
        finally {
            return errInfo;
        }
    }

    @Override
    public String submitRemoteStreamBySql(String jobMangerHost, int port, String jobSource, String jobName, String syncDataSql) throws FlinkException {
        if (StringUtils.isEmpty((String)jobMangerHost)) {
            throw new FlinkException("Flink \u4e3b\u673a\u4fe1\u606f\u4e3a\u7a7a ");
        }
        StreamTableEnvironment remoteEnvironment = FlinkEnvironmentHandle.createRemoteTableEnvironment(jobMangerHost, port);
        String[] statements = FlinkSqlUtil.getStatements(syncDataSql + jobSource, "\\);");
        StringBuilder jobIDString = new StringBuilder();
        for (String statement : statements) {
            SqlType operationType = FlinkSqlUtil.getOperationType(statement);
            if (operationType.equals((Object)SqlType.INSERT) || operationType.equals((Object)SqlType.SELECT) || operationType.equals((Object)SqlType.SHOW) || operationType.equals((Object)SqlType.DESCRIBE) || operationType.equals((Object)SqlType.DESC)) {
                Configuration configuration = remoteEnvironment.getConfig().getConfiguration();
                configuration.setString("pipeline.name", jobName);
                TableResult tableResult = remoteEnvironment.executeSql(statement);
                Optional jobClient = tableResult.getJobClient();
                JobID jobID = ((JobClient)jobClient.get()).getJobID();
                jobIDString.append(jobID.toString());
                continue;
            }
            remoteEnvironment.executeSql(statement);
        }
        return jobIDString.toString();
    }

    @Override
    public String submitLocalStreamBySql(String jobSource, String jobaName, String syncDataSql) {
        StreamTableEnvironment remoteEnvironment = FlinkEnvironmentHandle.createLocalTableEnvironment();
        String[] statements = FlinkSqlUtil.getStatements(syncDataSql + jobSource, "\\);");
        StringBuilder jobIDString = new StringBuilder();
        for (String statement : statements) {
            SqlType operationType = FlinkSqlUtil.getOperationType(statement);
            if (operationType.equals((Object)SqlType.INSERT) || operationType.equals((Object)SqlType.SELECT) || operationType.equals((Object)SqlType.SHOW) || operationType.equals((Object)SqlType.DESCRIBE) || operationType.equals((Object)SqlType.DESC)) {
                Configuration configuration = remoteEnvironment.getConfig().getConfiguration();
                configuration.setString("pipeline.name", jobaName);
                TableResult tableResult = remoteEnvironment.executeSql(statement);
                Optional jobClient = tableResult.getJobClient();
                JobID jobID = ((JobClient)jobClient.get()).getJobID();
                jobIDString.append(jobID.toString());
                continue;
            }
            remoteEnvironment.executeSql(statement);
        }
        return jobIDString.toString();
    }

    @Override
    public String submitLocalBatchBySql(String jobSource) {
        StreamTableEnvironment remoteEnvironment = FlinkEnvironmentHandle.createLocalTableEnvironment();
        String[] statements = FlinkSqlUtil.getStatements(jobSource, ";");
        StringBuilder jobIDString = new StringBuilder();
        for (String statement : statements) {
            SqlType operationType = FlinkSqlUtil.getOperationType(statement);
            if (operationType.equals((Object)SqlType.INSERT) || operationType.equals((Object)SqlType.SELECT) || operationType.equals((Object)SqlType.SHOW) || operationType.equals((Object)SqlType.DESCRIBE) || operationType.equals((Object)SqlType.DESC)) {
                TableResult tableResult = remoteEnvironment.executeSql(statement);
                Optional jobClient = tableResult.getJobClient();
                JobID jobID = ((JobClient)jobClient.get()).getJobID();
                jobIDString.append(jobID.toString());
                continue;
            }
            remoteEnvironment.executeSql(statement);
        }
        return jobIDString.toString();
    }

    @Override
    public String submitRemoteByApi(CaptureConfigModel configModel) throws Exception {
        StreamExecutionEnvironment remoteEnvironment = FlinkEnvironmentHandle.createRemoteEnvironment("10.70.10.55", 8082);
        Properties prop = new Properties();
        prop.setProperty("autoReconnect", "true");
        MySqlSource build = MySqlSource.builder().serverTimeZone("UTC").hostname("10.70.10.63").port(3307).username("root").password("rongke@2022").databaseList(new String[]{"jsbos_logistics_standard"}).tableList(new String[]{"jsbos_logistics_standard.base_user"}).startupOptions(StartupOptions.latest()).deserializer((DebeziumDeserializationSchema)new JsonDebeziumDeserializationSchema()).jdbcProperties(prop).build();
        remoteEnvironment.fromSource((Source)build, WatermarkStrategy.noWatermarks(), "MySQL Source").print().setParallelism(1);
        JobID jobID = remoteEnvironment.execute("test").getJobID();
        return jobID.toString();
    }

    @Override
    public String submitLocalByApi(CaptureConfigModel configModel) {
        return null;
    }
}

