package com.bringspring.flink.handler.service.impl;

import com.bringspring.common.util.StringUtil;
import com.bringspring.constant.NetConstant;
import com.bringspring.flink.handler.service.FlinkHandleService;
import com.bringspring.flink.handler.service.environment.FlinkEnvironmentHandle;
import com.bringspring.flink.trans.SqlType;
import com.bringspring.flink.utils.FlinkSqlUtil;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.util.FlinkException;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/bringspring/flink/handler/service/impl/FlinkHandleServiceImpl.class */
public class FlinkHandleServiceImpl implements FlinkHandleService {
    @Override // com.bringspring.flink.handler.service.FlinkHandleService
    public String check(String str) throws Exception {
        StreamTableEnvironmentImpl create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), EnvironmentSettings.newInstance().inStreamingMode().build());
        create.getPlanner();
        create.getPlanner().getParser();
        try {
            try {
                SqlParser.create(str, SqlParser.config().withParserFactory(FlinkSqlParserImpl.FACTORY).withQuoting(Quoting.BACK_TICK).withUnquotedCasing(Casing.TO_LOWER).withQuotedCasing(Casing.UNCHANGED).withConformance(FlinkSqlConformance.DEFAULT)).parseStmtList();
                return "验证成功";
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.toString());
                String str2 = "出现语法错误：" + e.toString().split(": ")[1].split("Was")[0];
                throw new Exception(e.toString().split(": ").length > 2 ? str2 + " : " + e.toString().split(": ")[2] : str2);
            }
        } catch (Throwable th) {
            return "验证成功";
        }
    }

    @Override // com.bringspring.flink.handler.service.FlinkHandleService
    public String submitRemote(String str, String str2) throws FlinkException {
        if (StringUtil.isEmpty(str)) {
            throw new FlinkException("Flink 主机信息为空 ");
        }
        String[] split = str.split(NetConstant.COLON);
        String str3 = "";
        int i = 8081;
        if (split.length >= 2) {
            str3 = split[0];
            i = Integer.valueOf(split[1]).intValue();
        }
        StreamTableEnvironment createRemoteEnvironment = FlinkEnvironmentHandle.createRemoteEnvironment(str3, Integer.valueOf(i));
        String[] statements = FlinkSqlUtil.getStatements(str2, ";");
        StringBuilder sb = new StringBuilder();
        for (String str4 : statements) {
            SqlType operationType = FlinkSqlUtil.getOperationType(str4);
            if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT) || operationType.equals(SqlType.SHOW) || operationType.equals(SqlType.DESCRIBE) || operationType.equals(SqlType.DESC)) {
                sb.append(((JobClient) createRemoteEnvironment.executeSql(str4).getJobClient().get()).getJobID().toString());
            } else {
                createRemoteEnvironment.executeSql(str4);
            }
        }
        return sb.toString();
    }

    @Override // com.bringspring.flink.handler.service.FlinkHandleService
    public String submitLocal(String str) {
        StreamTableEnvironment createLocalEnvironment = FlinkEnvironmentHandle.createLocalEnvironment();
        String[] statements = FlinkSqlUtil.getStatements(str, ";");
        StringBuilder sb = new StringBuilder();
        for (String str2 : statements) {
            SqlType operationType = FlinkSqlUtil.getOperationType(str2);
            if (operationType.equals(SqlType.INSERT) || operationType.equals(SqlType.SELECT) || operationType.equals(SqlType.SHOW) || operationType.equals(SqlType.DESCRIBE) || operationType.equals(SqlType.DESC)) {
                sb.append(((JobClient) createLocalEnvironment.executeSql(str2).getJobClient().get()).getJobID().toString());
            } else {
                createLocalEnvironment.executeSql(str2);
            }
        }
        return sb.toString();
    }
}
