/*
 * Decompiled with CFR 0.152.
 */
package com.bringspring.daap.capture.service.impl;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bringspring.common.exception.DataException;
import com.bringspring.common.util.JsonUtil;
import com.bringspring.common.util.RandomUtil;
import com.bringspring.common.util.StringUtils;
import com.bringspring.daap.capture.constant.EnabledMarkEnum;
import com.bringspring.daap.capture.entity.DaapCaptureConfigEntity;
import com.bringspring.daap.capture.entity.DaapCaptureSinkEntity;
import com.bringspring.daap.capture.entity.DaapCaptureSinkFieldEntity;
import com.bringspring.daap.capture.entity.DaapCaptureSinkTableEntity;
import com.bringspring.daap.capture.entity.DaapCaptureSourceEntity;
import com.bringspring.daap.capture.entity.DaapCaptureSourceFieldEntity;
import com.bringspring.daap.capture.entity.DaapCaptureSourceTableEntity;
import com.bringspring.daap.capture.mapper.DaapCaptureConfigMapper;
import com.bringspring.daap.capture.model.config.CaptureConfigDetail;
import com.bringspring.daap.capture.model.config.CaptureConfigModel;
import com.bringspring.daap.capture.model.config.DaapCaptureConfigPagination;
import com.bringspring.daap.capture.model.mapping.MappingTableModel;
import com.bringspring.daap.capture.model.mapping.MappingTableSqlModel;
import com.bringspring.daap.capture.model.sink.SinkModel;
import com.bringspring.daap.capture.model.source.SourceModel;
import com.bringspring.daap.capture.model.sourcefield.SourceFieldModel;
import com.bringspring.daap.capture.service.DaapCaptureConfigService;
import com.bringspring.daap.capture.service.DaapCaptureSinkFieldService;
import com.bringspring.daap.capture.service.DaapCaptureSinkService;
import com.bringspring.daap.capture.service.DaapCaptureSinkTableService;
import com.bringspring.daap.capture.service.DaapCaptureSourceFieldService;
import com.bringspring.daap.capture.service.DaapCaptureSourceService;
import com.bringspring.daap.capture.service.DaapCaptureSourceTableService;
import com.bringspring.daap.capture.service.DaapCaptureTableMappingService;
import com.bringspring.daap.job.entity.DaapConnectorEntity;
import com.bringspring.daap.job.entity.DaapFlinkCheckpointEntity;
import com.bringspring.daap.job.entity.DaapFlinkExcepointEntity;
import com.bringspring.daap.job.entity.DaapFlinkManagerInfoEntity;
import com.bringspring.daap.job.entity.DaapFlinkSavepointEntity;
import com.bringspring.daap.job.entity.DaapJobClusterEntity;
import com.bringspring.daap.job.model.daapconnector.FileSystemWithProperties;
import com.bringspring.daap.job.model.daapconnector.MysqlCdcWithProperties;
import com.bringspring.daap.job.model.daapconnector.MysqlJdbcWithProperties;
import com.bringspring.daap.job.service.DaapConnectorService;
import com.bringspring.daap.job.service.DaapFlinkCheckpointService;
import com.bringspring.daap.job.service.DaapFlinkExcepointService;
import com.bringspring.daap.job.service.DaapFlinkManagerInfoService;
import com.bringspring.daap.job.service.DaapFlinkSavepointService;
import com.bringspring.daap.job.service.DaapJobClusterService;
import com.bringspring.daap.utils.JdbcUtils;
import com.bringspring.flink.handler.service.environment.FlinkEnvironmentHandle;
import com.bringspring.system.base.exception.BaseException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DaapCaptureConfigServiceImpl
extends ServiceImpl<DaapCaptureConfigMapper, DaapCaptureConfigEntity>
implements DaapCaptureConfigService {
    @Autowired
    private DaapCaptureSourceService daapCaptureSourceService;
    @Autowired
    private DaapCaptureSourceTableService daapCaptureSourceTableService;
    @Autowired
    private DaapCaptureSourceFieldService daapCaptureSourceFieldService;
    @Autowired
    private DaapCaptureSinkService daapCaptureSinkService;
    @Autowired
    private DaapCaptureSinkTableService daapCaptureSinkTableService;
    @Autowired
    private DaapCaptureSinkFieldService daapCaptureSinkFieldService;
    @Autowired
    private DaapCaptureTableMappingService daapCaptureTableMappingService;
    @Autowired
    private DaapJobClusterService daapJobClusterService;
    @Autowired
    private DaapConnectorService daapConnectorService;
    @Autowired
    private DaapFlinkSavepointService daapFlinkSavepointService;
    @Autowired
    private DaapFlinkCheckpointService daapFlinkCheckpointService;
    @Autowired
    private DaapFlinkExcepointService daapFlinkExcepointService;
    @Autowired
    private DaapFlinkManagerInfoService daapFlinkManagerInfoService;

    @Override
    public CaptureConfigModel getCaptureConfigInfo(String id) throws DataException {
        CaptureConfigModel configModel = new CaptureConfigModel();
        DaapCaptureConfigEntity captureConfig = this.getInfo(id);
        if (captureConfig == null) {
            return null;
        }
        configModel = (CaptureConfigModel)JsonUtil.getJsonToBean((Object)captureConfig, CaptureConfigModel.class);
        DaapCaptureSourceEntity captureSource = this.daapCaptureSourceService.getCaptureSourceByConfigId(id);
        SourceModel sourceModel = (SourceModel)JsonUtil.getJsonToBean((Object)captureSource, SourceModel.class);
        configModel.setCaptureSource(sourceModel);
        DaapCaptureSinkEntity sinkByConfigId = this.daapCaptureSinkService.getSinkByConfigId(id);
        SinkModel sinkModel = (SinkModel)JsonUtil.getJsonToBean((Object)sinkByConfigId, SinkModel.class);
        configModel.setCaptureSink(sinkModel);
        List<MappingTableModel> mappingTableModels = this.daapCaptureTableMappingService.getMappingTableFieldModels(id);
        configModel.setMappingTables(mappingTableModels);
        return configModel;
    }

    @Override
    public CaptureConfigDetail getCaptureConfigDetail(String id) throws DataException {
        DaapCaptureConfigEntity captureConfig = this.getInfo(id);
        CaptureConfigDetail configModel = (CaptureConfigDetail)JsonUtil.getJsonToBean((Object)captureConfig, CaptureConfigDetail.class);
        DaapCaptureSourceEntity captureSource = this.daapCaptureSourceService.getCaptureSourceByConfigId(id);
        SourceModel sourceModel = (SourceModel)JsonUtil.getJsonToBean((Object)captureSource, SourceModel.class);
        configModel.setCaptureSource(sourceModel);
        DaapConnectorEntity sourceConnector = this.daapConnectorService.getInfo(captureSource.getConnectorId());
        configModel.setSourceConnector(sourceConnector);
        DaapCaptureSinkEntity sinkByConfigId = this.daapCaptureSinkService.getSinkByConfigId(id);
        SinkModel sinkModel = (SinkModel)JsonUtil.getJsonToBean((Object)sinkByConfigId, SinkModel.class);
        configModel.setCaptureSink(sinkModel);
        DaapConnectorEntity sinkConnector = this.daapConnectorService.getInfo(sinkByConfigId.getConnectorId());
        configModel.setSinkConnector(sinkConnector);
        List<MappingTableModel> mappingTableModels = this.daapCaptureTableMappingService.getMappingTableFieldModels(id);
        configModel.setMappingTables(mappingTableModels);
        DaapJobClusterEntity flinkResource = this.daapJobClusterService.getInfo(captureConfig.getFlinkResourceId());
        configModel.setFlinkResource(flinkResource);
        return configModel;
    }

    @Override
    public JSONObject getFlinkJobDetail(String jobId, String flinkHost, int port) {
        JSONObject resultJSONObject = new JSONObject();
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("http://").append(flinkHost).append(":").append(port).append("/jobs/").append(jobId);
        String s = HttpUtil.get((String)stringBuffer.toString());
        JSONObject jsonObject = JSONObject.parseObject((String)s);
        JSONArray vertices = jsonObject.getJSONArray("vertices");
        resultJSONObject.put("duration", (Object)jsonObject.getString("duration"));
        for (Object vertex : vertices) {
            JSONObject vertexJson = (JSONObject)vertex;
            String vertexId = vertexJson.getString("id");
            String name = vertexJson.getString("name");
            stringBuffer = new StringBuffer();
            String metricParaUrl = stringBuffer.append("http://").append(flinkHost).append(":").append(port).append("/jobs/").append(jobId).append("/vertices/").append(vertexId).append("/metrics").toString();
            String metricParaJson = HttpUtil.get((String)metricParaUrl.toString());
            JSONArray metricParaList = JSONArray.parseArray((String)metricParaJson);
            List collect = metricParaList.stream().filter(metric -> {
                String id = ((JSONObject)metric).getString("id");
                return id.contains("ConstraintEnforcer") && (id.contains("numRecordsIn") || id.contains("numRecordsOut") || id.contains("numRecordsInPerSecond") || id.contains("numRecordsOutPerSecond"));
            }).map(metric -> ((JSONObject)metric).getString("id")).collect(Collectors.toList());
            String metricsParam = String.join((CharSequence)",", collect);
            stringBuffer = new StringBuffer();
            String metricsUrl = stringBuffer.append("http://").append(flinkHost).append(":").append(port).append("/jobs/").append(jobId).append("/vertices/").append(vertexId).append("/metrics?get=").append(metricsParam).toString();
            String metricResult = HttpUtil.get((String)metricsUrl.toString());
            JSONArray metricArr = JSONArray.parseArray((String)metricResult);
            JSONObject metricObject = new JSONObject();
            for (Object metricObj : metricArr) {
                JSONObject metric2 = (JSONObject)metricObj;
                String metricId = StringUtils.substringAfterLast((String)metric2.getString("id"), (String)"].");
                if (metric2.getString("value").contains(".")) {
                    String number = String.format("%.1f", Float.valueOf(metric2.getFloatValue("value")));
                    metricObject.put(metricId, (Object)number);
                    continue;
                }
                metricObject.put(metricId, (Object)metric2.getString("value"));
            }
            name = StringUtils.substringBefore((String)name, (String)"[");
            resultJSONObject.put(name, (Object)metricObject);
        }
        return resultJSONObject;
    }

    @Override
    public List<DaapCaptureConfigEntity> getList(DaapCaptureConfigPagination daapCaptureConfigPagination) {
        ArrayList AllIdList = new ArrayList();
        int total = 0;
        int daapCaptureConfigNum = 0;
        QueryWrapper daapCaptureConfigQueryWrapper = new QueryWrapper();
        if (StringUtils.isNotEmpty((String)daapCaptureConfigPagination.getName())) {
            ++daapCaptureConfigNum;
            daapCaptureConfigQueryWrapper.lambda().like(DaapCaptureConfigEntity::getName, (Object)daapCaptureConfigPagination.getName());
        }
        if (StringUtils.isNotEmpty((String)daapCaptureConfigPagination.getEnabledMark())) {
            ++daapCaptureConfigNum;
            daapCaptureConfigQueryWrapper.lambda().eq(DaapCaptureConfigEntity::getEnabledMark, (Object)daapCaptureConfigPagination.getEnabledMark());
        }
        if (AllIdList.size() > 0) {
            daapCaptureConfigQueryWrapper.lambda().in(DaapCaptureConfigEntity::getId, AllIdList);
        }
        if (StringUtils.isEmpty((String)daapCaptureConfigPagination.getSidx())) {
            daapCaptureConfigQueryWrapper.lambda().orderByDesc(DaapCaptureConfigEntity::getId);
        } else {
            try {
                String sidx = daapCaptureConfigPagination.getSidx();
                DaapCaptureConfigEntity daapCaptureConfigEntity = new DaapCaptureConfigEntity();
                Field declaredField = daapCaptureConfigEntity.getClass().getDeclaredField(sidx);
                declaredField.setAccessible(true);
                String value = declaredField.getAnnotation(TableField.class).value();
                daapCaptureConfigQueryWrapper = "asc".equals(daapCaptureConfigPagination.getSort().toLowerCase()) ? (QueryWrapper)daapCaptureConfigQueryWrapper.orderByAsc((Object)value) : (QueryWrapper)daapCaptureConfigQueryWrapper.orderByDesc((Object)value);
            }
            catch (NoSuchFieldException e) {
                e.printStackTrace();
            }
        }
        if (total > 0 && AllIdList.size() > 0 || total == 0) {
            Page page = new Page(daapCaptureConfigPagination.getCurrentPage(), daapCaptureConfigPagination.getPageSize());
            IPage userIPage = this.page((IPage)page, (Wrapper)daapCaptureConfigQueryWrapper);
            return daapCaptureConfigPagination.setData(userIPage.getRecords(), userIPage.getTotal());
        }
        ArrayList list = new ArrayList();
        return daapCaptureConfigPagination.setData(list, list.size());
    }

    @Override
    public DaapCaptureConfigEntity getInfo(String id) {
        QueryWrapper queryWrapper = new QueryWrapper();
        queryWrapper.lambda().eq(DaapCaptureConfigEntity::getId, (Object)id);
        return (DaapCaptureConfigEntity)this.getOne((Wrapper)queryWrapper);
    }

    @Override
    public DaapCaptureConfigEntity getCaptureConfigByJobId(String jobId) {
        QueryWrapper queryWrapper = new QueryWrapper();
        queryWrapper.lambda().eq(DaapCaptureConfigEntity::getJobId, (Object)jobId);
        return (DaapCaptureConfigEntity)this.getOne((Wrapper)queryWrapper);
    }

    @Override
    @DSTransactional
    public String create(CaptureConfigModel captureConfigModel) {
        DaapCaptureConfigEntity configEntity = (DaapCaptureConfigEntity)JsonUtil.getJsonToBean((Object)captureConfigModel, DaapCaptureConfigEntity.class);
        configEntity.setId(RandomUtil.uuId());
        this.save(configEntity);
        DaapCaptureSourceEntity captureSourceEntity = (DaapCaptureSourceEntity)JsonUtil.getJsonToBean((Object)captureConfigModel.getCaptureSource(), DaapCaptureSourceEntity.class);
        captureSourceEntity.setConfigId(configEntity.getId());
        captureSourceEntity.setId(RandomUtil.uuId());
        this.daapCaptureSourceService.save(captureSourceEntity);
        DaapCaptureSinkEntity captureSinkEntity = (DaapCaptureSinkEntity)JsonUtil.getJsonToBean((Object)captureConfigModel.getCaptureSink(), DaapCaptureSinkEntity.class);
        captureSinkEntity.setConfigId(configEntity.getId());
        captureSinkEntity.setId(RandomUtil.uuId());
        this.daapCaptureSinkService.save(captureSinkEntity);
        List<MappingTableModel> mappingTables = captureConfigModel.getMappingTables();
        for (MappingTableModel mappingTable : mappingTables) {
            DaapCaptureSourceTableEntity sourceTableEntity = (DaapCaptureSourceTableEntity)JsonUtil.getJsonToBean((Object)mappingTable.getSourceTable(), DaapCaptureSourceTableEntity.class);
            sourceTableEntity.setId(RandomUtil.uuId());
            sourceTableEntity.setConfigId(captureSourceEntity.getConfigId());
            this.daapCaptureSourceTableService.save(sourceTableEntity);
            if (ObjectUtil.isNotNull(mappingTable.getSourceTable().getSourceFieldModels())) {
                List sourceFieldList = JsonUtil.getJsonToList(mappingTable.getSourceTable().getSourceFieldModels(), DaapCaptureSourceFieldEntity.class);
                for (DaapCaptureSourceFieldEntity sourceFieldEntity : sourceFieldList) {
                    sourceFieldEntity.setId(RandomUtil.uuId());
                    sourceFieldEntity.setSourceTableId(sourceTableEntity.getId());
                }
                this.daapCaptureSourceFieldService.saveBatch(sourceFieldList);
            }
            DaapCaptureSinkTableEntity sinkTableEntity = (DaapCaptureSinkTableEntity)JsonUtil.getJsonToBean((Object)mappingTable.getSinkTable(), DaapCaptureSinkTableEntity.class);
            sinkTableEntity.setId(RandomUtil.uuId());
            sinkTableEntity.setConfigId(configEntity.getId());
            this.daapCaptureSinkTableService.save(sinkTableEntity);
            if (!ObjectUtil.isNotNull(mappingTable.getSinkTable().getSinkFieldModels())) continue;
            List sinkFieldList = JsonUtil.getJsonToList(mappingTable.getSinkTable().getSinkFieldModels(), DaapCaptureSinkFieldEntity.class);
            for (DaapCaptureSinkFieldEntity sinkFieldEntity : sinkFieldList) {
                sinkFieldEntity.setId(RandomUtil.uuId());
                sinkFieldEntity.setTableId(sinkTableEntity.getId());
            }
            this.daapCaptureSinkFieldService.saveBatch(sinkFieldList);
        }
        return configEntity.getId();
    }

    @Override
    @DSTransactional
    public boolean updateCaptureConfig(CaptureConfigModel configModel) {
        DaapCaptureConfigEntity configEntity = (DaapCaptureConfigEntity)JsonUtil.getJsonToBean((Object)configModel, DaapCaptureConfigEntity.class);
        List<DaapCaptureSourceTableEntity> sourceTableListByConfigIdList = this.daapCaptureSourceTableService.getSourceTableListByConfigId(configModel.getId());
        List<DaapCaptureSinkTableEntity> sinkTableByConfigIdList = this.daapCaptureSinkTableService.getSinkTableByConfigId(configModel.getId());
        this.updateById(configEntity);
        DaapCaptureSourceEntity captureSourceEntity = (DaapCaptureSourceEntity)JsonUtil.getJsonToBean((Object)configModel.getCaptureSource(), DaapCaptureSourceEntity.class);
        this.daapCaptureSourceService.updateById(captureSourceEntity);
        DaapCaptureSinkEntity captureSinkEntity = (DaapCaptureSinkEntity)JsonUtil.getJsonToBean((Object)configModel.getCaptureSink(), DaapCaptureSinkEntity.class);
        this.daapCaptureSinkService.updateById(captureSinkEntity);
        this.daapCaptureSourceTableService.deleteByConfigId(captureSourceEntity.getConfigId());
        this.daapCaptureSinkTableService.deleteSinkTableByConfigId(captureSourceEntity.getConfigId());
        List<MappingTableModel> mappingTables = configModel.getMappingTables();
        for (DaapCaptureSourceTableEntity delSourceTableEntity : sourceTableListByConfigIdList) {
            this.daapCaptureSourceFieldService.deleteByTableId(delSourceTableEntity.getId());
        }
        for (DaapCaptureSinkTableEntity delSinkTableEntity : sinkTableByConfigIdList) {
            this.daapCaptureSinkFieldService.deleteByTableId(delSinkTableEntity.getId());
        }
        for (MappingTableModel mappingTable : mappingTables) {
            DaapCaptureSourceTableEntity sourceTableEntity = (DaapCaptureSourceTableEntity)JsonUtil.getJsonToBean((Object)mappingTable.getSourceTable(), DaapCaptureSourceTableEntity.class);
            sourceTableEntity.setId(RandomUtil.uuId());
            sourceTableEntity.setConfigId(captureSourceEntity.getConfigId());
            this.daapCaptureSourceTableService.save(sourceTableEntity);
            List sourceFieldList = JsonUtil.getJsonToList(mappingTable.getSourceTable().getSourceFieldModels(), DaapCaptureSourceFieldEntity.class);
            for (DaapCaptureSourceFieldEntity sourceFieldEntity : sourceFieldList) {
                sourceFieldEntity.setId(RandomUtil.uuId());
                sourceFieldEntity.setSourceTableId(sourceTableEntity.getId());
            }
            this.daapCaptureSourceFieldService.saveBatch(sourceFieldList);
            DaapCaptureSinkTableEntity sinkTableEntity = (DaapCaptureSinkTableEntity)JsonUtil.getJsonToBean((Object)mappingTable.getSinkTable(), DaapCaptureSinkTableEntity.class);
            sinkTableEntity.setId(RandomUtil.uuId());
            sinkTableEntity.setConfigId(configEntity.getId());
            this.daapCaptureSinkTableService.save(sinkTableEntity);
            List sinkFieldList = JsonUtil.getJsonToList(mappingTable.getSinkTable().getSinkFieldModels(), DaapCaptureSinkFieldEntity.class);
            for (DaapCaptureSinkFieldEntity sinkFieldEntity : sinkFieldList) {
                sinkFieldEntity.setId(RandomUtil.uuId());
                sinkFieldEntity.setTableId(sinkTableEntity.getId());
            }
            this.daapCaptureSinkFieldService.saveBatch(sinkFieldList);
        }
        return true;
    }

    @Override
    @DSTransactional
    public void delete(DaapCaptureConfigEntity entity) {
        if (entity != null) {
            DaapCaptureSourceEntity captureSource = this.daapCaptureSourceService.getCaptureSourceByConfigId(entity.getId());
            this.daapCaptureSourceService.removeById((Serializable)((Object)captureSource.getId()));
            List<DaapCaptureSourceTableEntity> sourceTableEntityList = this.daapCaptureSourceTableService.getSourceTableListByConfigId(entity.getId());
            for (DaapCaptureSourceTableEntity sourceTableEntity : sourceTableEntityList) {
                this.daapCaptureSourceTableService.deleteByConfigId(entity.getId());
                this.daapCaptureSourceFieldService.deleteByTableId(sourceTableEntity.getId());
            }
            DaapCaptureSinkEntity captureSinkEntity = this.daapCaptureSinkService.getSinkByConfigId(entity.getId());
            this.daapCaptureSinkService.removeById((Serializable)((Object)captureSinkEntity.getId()));
            List<DaapCaptureSinkTableEntity> sinkTableByConfigId = this.daapCaptureSinkTableService.getSinkTableByConfigId(entity.getId());
            for (DaapCaptureSinkTableEntity sinkTableEntity : sinkTableByConfigId) {
                this.daapCaptureSinkTableService.removeById((Serializable)((Object)sinkTableEntity.getId()));
                this.daapCaptureSinkFieldService.deleteByTableId(sinkTableEntity.getId());
            }
            this.removeById((Serializable)((Object)entity.getId()));
        }
    }

    @Override
    @DSTransactional
    public boolean submitCapture(String id, String checkOrSavePath) throws Exception {
        List<DaapFlinkSavepointEntity> daapFlinkSavepointEntityList;
        DaapFlinkManagerInfoEntity daapFlinkManagerInfoEntity;
        DaapFlinkExcepointEntity daapFlinkExcepointEntity;
        CaptureConfigModel configModel = this.getCaptureConfigInfo(id);
        String flinkJobId = this.submitRemoteBySql(configModel, checkOrSavePath);
        if (StringUtils.isEmpty((String)flinkJobId)) {
            throw new BaseException("\u4efb\u52a1\u63d0\u4ea4\u5931\u8d25");
        }
        DaapFlinkCheckpointEntity daapFlinkCheckpointEntity = this.daapFlinkCheckpointService.getDaapFlinkCheckpointByJobId(configModel.getJobId());
        if (daapFlinkCheckpointEntity != null) {
            daapFlinkCheckpointEntity.setJobId(flinkJobId);
            this.daapFlinkCheckpointService.updateById(daapFlinkCheckpointEntity);
        }
        if ((daapFlinkExcepointEntity = this.daapFlinkExcepointService.getDaapFlinkExcepointByJobId(configModel.getJobId())) != null) {
            this.daapFlinkExcepointService.removeById(daapFlinkExcepointEntity);
        }
        if ((daapFlinkManagerInfoEntity = this.daapFlinkManagerInfoService.getDaapFlinkManagerInfoByJobId(configModel.getJobId())) != null) {
            this.daapFlinkManagerInfoService.removeById(daapFlinkManagerInfoEntity);
        }
        if ((daapFlinkSavepointEntityList = this.daapFlinkSavepointService.getDaapFlinkSavepointByJobId(configModel.getJobId())).size() > 0) {
            for (DaapFlinkSavepointEntity daapFlinkSavepointEntity : daapFlinkSavepointEntityList) {
                daapFlinkSavepointEntity.setJobId(flinkJobId);
                this.daapFlinkSavepointService.updateById(daapFlinkSavepointEntity);
            }
        }
        DaapCaptureConfigEntity configEntity = (DaapCaptureConfigEntity)JsonUtil.getJsonToBean((Object)configModel, DaapCaptureConfigEntity.class);
        configEntity.setEnabledMark(EnabledMarkEnum.RUNNING.getCode());
        configEntity.setJobId(flinkJobId);
        configEntity.setActivateTime(new Date());
        this.updateById(configEntity);
        return true;
    }

    @Override
    public String getCaptureCreatTableSqlById(String id) throws Exception {
        CaptureConfigModel configModel = this.getCaptureConfigInfo(id);
        if (configModel == null) {
            return "\u6240\u5173\u8054\u7684\u91c7\u96c6\u5411\u5bfc\u4e0d\u5b58\u5728";
        }
        String flinkSql = this.getFlinkCreatTableSql(configModel);
        return flinkSql;
    }

    public String getFlinkCreatTableSql(CaptureConfigModel configModel) {
        String type;
        DaapConnectorEntity sourceConnector = this.daapConnectorService.getInfo(configModel.getCaptureSource().getConnectorId());
        DaapConnectorEntity sinkConnector = this.daapConnectorService.getInfo(configModel.getCaptureSink().getConnectorId());
        StringBuilder flinkSqlBuilder = new StringBuilder();
        LinkedHashMap<Object, Object> resultMap = new LinkedHashMap();
        if (StringUtils.isNotNull((Object)sinkConnector) && StringUtils.isNotEmpty((String)(type = sinkConnector.getDataSourceType())) && type.contains("CDC")) {
            JSONObject dataSourceProperties = JSON.parseObject((String)sinkConnector.getDataSourceProperties());
            resultMap = this.daapConnectorService.DataTypeChang(dataSourceProperties, type);
            sinkConnector.setDataSourceProperties(JSON.toJSONString(resultMap));
        }
        try {
            List<MappingTableModel> mappingTables = configModel.getMappingTables();
            for (MappingTableModel mappingTable : mappingTables) {
                MappingTableSqlModel tableSqlModel = this.generateDdl(sourceConnector, sinkConnector, mappingTable);
                flinkSqlBuilder.append(tableSqlModel.getSourceTableSql()).append(tableSqlModel.getSinkTableSql());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return flinkSqlBuilder.toString();
    }

    @Override
    public Map<String, String> getCaptureFlinkSql(CaptureConfigModel configModel) {
        String type;
        DaapConnectorEntity sourceConnector = this.daapConnectorService.getInfo(configModel.getCaptureSource().getConnectorId());
        DaapConnectorEntity sinkConnector = this.daapConnectorService.getInfo(configModel.getCaptureSink().getConnectorId());
        HashMap<String, String> resultFlinkSqlMap = new HashMap<String, String>();
        StringBuilder flinkCreateSqlBuilder = new StringBuilder();
        StringBuilder flinkSyncSqlBuilder = new StringBuilder();
        LinkedHashMap<Object, Object> resultMap = new LinkedHashMap();
        if (StringUtils.isNotNull((Object)sinkConnector) && StringUtils.isNotEmpty((String)(type = sinkConnector.getDataSourceType())) && type.contains("CDC")) {
            JSONObject dataSourceProperties = JSON.parseObject((String)sinkConnector.getDataSourceProperties());
            resultMap = this.daapConnectorService.DataTypeChang(dataSourceProperties, type);
            sinkConnector.setDataSourceProperties(JSON.toJSONString(resultMap));
        }
        try {
            List<MappingTableModel> mappingTables = configModel.getMappingTables();
            for (MappingTableModel mappingTable : mappingTables) {
                MappingTableSqlModel tableSqlModel = this.generateDdl(sourceConnector, sinkConnector, mappingTable);
                flinkCreateSqlBuilder.append(tableSqlModel.getSourceTableSql()).append(tableSqlModel.getSinkTableSql());
                flinkSyncSqlBuilder.append(tableSqlModel.getInsertSql());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        resultFlinkSqlMap.put("creatTableSql", flinkCreateSqlBuilder.toString());
        resultFlinkSqlMap.put("syncTableSql", flinkSyncSqlBuilder.toString());
        return resultFlinkSqlMap;
    }

    @Override
    public boolean getFlinkStopJob(String jobId, String flinkHost, int port) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("http://").append(flinkHost).append(":").append(port).append("/jobs/").append(jobId).append("/yarn-cancel");
        HttpUtil.get((String)stringBuffer.toString());
        DaapCaptureConfigEntity captureConfigByJobId = this.getCaptureConfigByJobId(jobId);
        captureConfigByJobId.setEnabledMark(EnabledMarkEnum.PAUSE_RUN.getCode());
        this.updateById(captureConfigByJobId);
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String getFlinkSavePointAndStopJob(String jobIdStr, String flinkHost, int port) throws Exception {
        DaapCaptureConfigEntity daapCaptureConfigEntity = this.getCaptureConfigByJobId(jobIdStr);
        if ("1".equals(daapCaptureConfigEntity.getSavepoint().getEnabledMark())) {
            JobID jobId = JobID.fromHexString((String)jobIdStr);
            Configuration config = new Configuration();
            config.setString(RestOptions.ADDRESS, flinkHost);
            config.setInteger(RestOptions.PORT, port);
            RestClusterClient clusterClient = new RestClusterClient(config, (Object)"TestClusterClient");
            String savepointPath = "";
            try {
                CompletableFuture savepointFuture = clusterClient.triggerSavepoint(jobId, daapCaptureConfigEntity.getSavepoint().getSavepointDirectory(), SavepointFormatType.CANONICAL);
                savepointPath = (String)savepointFuture.get();
                System.out.println("Savepoint path: " + savepointPath);
                DaapFlinkSavepointEntity daapFlinkSavepointEntity = new DaapFlinkSavepointEntity();
                String mainId = RandomUtil.uuId();
                daapFlinkSavepointEntity.setId(mainId);
                daapFlinkSavepointEntity.setCaptureConfigId(daapCaptureConfigEntity.getId());
                daapFlinkSavepointEntity.setJobId(daapCaptureConfigEntity.getJobId());
                daapFlinkSavepointEntity.setSavepointUrl(savepointPath);
                daapFlinkSavepointEntity.setTaskStatus("COMPLETED");
                this.daapFlinkSavepointService.save(daapFlinkSavepointEntity);
            }
            catch (Exception e) {
                this.log.error(e.getMessage());
            }
            finally {
                this.getFlinkStopJob(jobIdStr, flinkHost, port);
                clusterClient.close();
            }
            return savepointPath;
        }
        this.getFlinkStopJob(jobIdStr, flinkHost, port);
        return "";
    }

    @Override
    public JSONObject getJobException(String jobId, String flinkHost, Integer flinkPort, Integer max) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("http://").append(flinkHost).append(":").append(flinkPort).append("/jobs/").append(jobId).append("/exceptions");
        if (max != null && max > 0) {
            stringBuffer.append("?maxExceptions=" + max);
        }
        String exception = HttpUtil.get((String)stringBuffer.toString());
        JSONObject jsonObject = JSONObject.parseObject((String)exception);
        return jsonObject;
    }

    @Override
    public JSONObject checkpointByDb(String jobId) {
        DaapFlinkCheckpointEntity daapFlinkCheckpointEntity = this.daapFlinkCheckpointService.getDaapFlinkCheckpointByJobId(jobId);
        String exception = "";
        if (daapFlinkCheckpointEntity != null) {
            exception = daapFlinkCheckpointEntity.getCheckpointInfo();
        }
        JSONObject jsonObject = JSONObject.parseObject((String)exception);
        return jsonObject;
    }

    @Override
    public List<DaapFlinkSavepointEntity> savepointByDb(String jobId) {
        List<DaapFlinkSavepointEntity> daapFlinkSavepointList = this.daapFlinkSavepointService.getDaapFlinkSavepointByJobId(jobId);
        return daapFlinkSavepointList;
    }

    @Override
    public JSONObject getJobCheckPoint(String jobId, String flinkHost, Integer flinkPort) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("http://").append(flinkHost).append(":").append(flinkPort).append("/jobs/").append(jobId).append("/checkpoints");
        String exception = HttpUtil.get((String)stringBuffer.toString());
        JSONObject jsonObject = JSONObject.parseObject((String)exception);
        return jsonObject;
    }

    public MappingTableSqlModel generateDdl(DaapConnectorEntity sourceConnector, DaapConnectorEntity sinkConnector, MappingTableModel mappingTableModel) throws Exception {
        String sourceConnectorStr = this.getConnectorString(sourceConnector.getDataSourceProperties(), mappingTableModel.getSourceTable().getTableName());
        String sinkConnectorStr = this.getConnectorString(sinkConnector.getDataSourceProperties(), mappingTableModel.getSinkTable().getTableName());
        MysqlJdbcWithProperties sourceModel = (MysqlJdbcWithProperties)JsonUtil.getJsonToBean((String)sourceConnector.getDataSourceProperties(), MysqlJdbcWithProperties.class);
        sourceModel.setTableName(mappingTableModel.getSourceTable().getTableName());
        FileSystemWithProperties sinkModel = (FileSystemWithProperties)JsonUtil.getJsonToBean((String)sinkConnector.getDataSourceProperties(), FileSystemWithProperties.class);
        sinkModel.setTablePath(sourceModel.getTableName(), sourceModel.getTableName());
        MappingTableSqlModel tableSqlModel = new MappingTableSqlModel();
        StringBuilder sourceField = new StringBuilder();
        StringBuilder sourceFieldAndType = new StringBuilder();
        StringBuilder sourceIdAndType = new StringBuilder();
        mappingTableModel.getSourceTable().getSourceFieldModels().forEach(field -> {
            if (field.getPrimaryKey().equals("1")) {
                sourceIdAndType.append(field.getFieldName()).append(",");
            }
            sourceFieldAndType.append("  ").append(field.getFieldName()).append(" ").append(field.getDataType()).append(",").append("\n");
            sourceField.append(field.getFieldName()).append(",").append("\n");
        });
        String sourceCreateSql = String.format("CREATE TABLE source_%s (\n%s  PRIMARY KEY (%s) NOT ENFORCED -- \u4e3b\u952e\n) WITH ( \n %s \n);\n", mappingTableModel.getSourceTable().getTableName(), sourceFieldAndType.toString(), StrUtil.removeSuffix((CharSequence)sourceIdAndType.toString(), (CharSequence)","), sourceConnectorStr);
        tableSqlModel.setSourceTableSql(sourceCreateSql);
        StringBuilder sinkField = new StringBuilder();
        StringBuilder sinkFieldAndType = new StringBuilder();
        StringBuilder sinkIdAndType = new StringBuilder();
        mappingTableModel.getSourceTable().getSourceFieldModels().forEach(field -> {
            if (field.getPrimaryKey().equals("1")) {
                sinkIdAndType.append(field.getFieldName()).append(",");
            }
            sinkFieldAndType.append("  ").append(field.getFieldName()).append(" ").append(field.getDataType()).append(",").append("\n");
            sinkField.append(field.getFieldName()).append(",").append("\n");
        });
        String sinkCreateTable = String.format("CREATE TABLE sink_%s (\n%s  PRIMARY KEY (%s) NOT ENFORCED -- \u4e3b\u952e\n) WITH ( \n %s \n);\n", mappingTableModel.getSinkTable().getTableName(), sinkFieldAndType.toString() + "sync_time TIMESTAMP,", StrUtil.removeSuffix((CharSequence)sinkIdAndType.toString(), (CharSequence)","), sinkConnectorStr);
        tableSqlModel.setSinkTableSql(sinkCreateTable);
        StringBuilder initSql = new StringBuilder();
        initSql.append("insert into sink_%s select %s from source_%s ");
        if (StringUtils.isNotEmpty((String)mappingTableModel.getSourceTable().getParams())) {
            JSONArray array = JSON.parseArray((String)mappingTableModel.getSourceTable().getParams());
            if (array.size() > 0) {
                initSql.append("where ");
            }
            block0: for (int i = 0; i < array.size(); ++i) {
                JSONObject jsonObject = array.getJSONObject(i);
                for (SourceFieldModel sourceFieldModel : mappingTableModel.getSourceTable().getSourceFieldModels()) {
                    if (!jsonObject.getString("fieldName").equals(sourceFieldModel.getFieldName())) continue;
                    if (i == 0) {
                        if (sourceFieldModel.getDataType().contains("DATE")) {
                            initSql.append("CAST(`").append(jsonObject.getString("fieldName")).append("` AS STRING) ").append(jsonObject.getString("fieldCondition")).append(" '").append(jsonObject.getString("criteria")).append("'");
                            continue block0;
                        }
                        if (sourceFieldModel.getDataType().contains("TIMESTAMP")) {
                            initSql.append("CAST (`").append(jsonObject.getString("fieldName")).append("` AS TIMESTAMP) ").append(jsonObject.getString("fieldCondition")).append(" CAST ('").append(jsonObject.getString("criteria")).append("' AS TIMESTAMP)");
                            continue block0;
                        }
                        initSql.append(jsonObject.getString("fieldName")).append(" ").append(jsonObject.getString("fieldCondition")).append(" ").append(jsonObject.getString("criteria"));
                        continue block0;
                    }
                    if (sourceFieldModel.getDataType().contains("DATE")) {
                        initSql.append(" AND ").append("CAST(`").append(jsonObject.getString("fieldName")).append("` AS STRING) ").append(jsonObject.getString("fieldCondition")).append(" '").append(jsonObject.getString("criteria")).append("'");
                        continue block0;
                    }
                    if (sourceFieldModel.getDataType().contains("TIMESTAMP")) {
                        initSql.append(" AND ").append("CAST (`").append(jsonObject.getString("fieldName")).append("` AS TIMESTAMP) ").append(jsonObject.getString("fieldCondition")).append(" CAST ('").append(jsonObject.getString("criteria")).append("' AS TIMESTAMP)");
                        continue block0;
                    }
                    initSql.append(" AND ").append(jsonObject.getString("fieldName")).append(" ").append(jsonObject.getString("fieldCondition")).append(" ").append(jsonObject.getString("criteria"));
                    continue block0;
                }
            }
        }
        String insertSql = String.format(initSql.toString() + " ;\n", mappingTableModel.getSinkTable().getTableName(), StrUtil.removeSuffix((CharSequence)(sourceField.toString() + "CURRENT_TIMESTAMP as sync_time"), (CharSequence)",\n"), mappingTableModel.getSourceTable().getTableName());
        this.updateCreatTable(mappingTableModel.getSinkTable().getTableName(), sinkConnector.getDataSourceProperties());
        tableSqlModel.setInsertSql(insertSql);
        return tableSqlModel;
    }

    public String getConnectorString(String connector, String tableName) {
        JSONObject jsonObject = JSONObject.parseObject((String)connector);
        jsonObject.put("table-name", (Object)tableName);
        String connectorProp = "";
        for (String s : jsonObject.keySet()) {
            System.out.println(s);
            connectorProp = connectorProp + "'" + s + "'='" + jsonObject.getString(s) + "', \n";
        }
        return connectorProp.substring(0, connectorProp.lastIndexOf(","));
    }

    public MappingTableSqlModel generateAnalogDataDdl(DaapConnectorEntity sourceConnector, DaapConnectorEntity sinkConnector, MappingTableModel mappingTableModel, int rowsPerSecond) {
        MappingTableSqlModel analogDataSqlModel = new MappingTableSqlModel();
        MysqlCdcWithProperties sourceModel = (MysqlCdcWithProperties)JsonUtil.getJsonToBean((String)sourceConnector.getDataSourceProperties(), MysqlCdcWithProperties.class);
        FileSystemWithProperties sinkModel = (FileSystemWithProperties)JsonUtil.getJsonToBean((String)sinkConnector.getDataSourceProperties(), FileSystemWithProperties.class);
        StringBuilder sourceField = new StringBuilder();
        StringBuilder sourceFieldAndType = new StringBuilder();
        StringBuilder sourceIdAndType = new StringBuilder();
        mappingTableModel.getSourceTable().getSourceFieldModels().forEach(field -> {
            if (field.getPrimaryKey().equals("1")) {
                sourceIdAndType.append(field.getFieldName()).append(",");
            }
            sourceFieldAndType.append("  ").append(field.getFieldName()).append(" ").append(field.getDataType()).append(",").append("\n");
            sourceField.append(field.getFieldName()).append(",").append("\n");
        });
        String sourceCreateSql = String.format("CREATE TABLE datagen_%s (%s  PRIMARY KEY (%s) NOT ENFORCED -- \u4e3b\u952e\n) WITH %s; \n", mappingTableModel.getSourceTable().getTableName(), sourceFieldAndType.toString(), StrUtil.removeSuffix((CharSequence)sourceIdAndType.toString(), (CharSequence)","), sourceModel.toString());
        analogDataSqlModel.setSourceTableSql(sourceCreateSql);
        String sinkCreateTable = String.format("CREATE TABLE sink_datagen_%s (%s  PRIMARY KEY (%s) NOT ENFORCED -- \u4e3b\u952e\n) WITH   ;", mappingTableModel.getSourceTable().getTableName(), sourceFieldAndType.toString(), StrUtil.removeSuffix((CharSequence)sourceIdAndType.toString(), (CharSequence)","), sinkModel.toString());
        analogDataSqlModel.setSinkTableSql(sinkCreateTable);
        String insertSql = String.format("insert into sink_datagen_%s select * from datagen_%s ;", mappingTableModel.getSourceTable().getTableName(), mappingTableModel.getSourceTable().getTableName());
        analogDataSqlModel.setInsertSql(insertSql);
        return analogDataSqlModel;
    }

    public String submitRemoteBySql(CaptureConfigModel configModel, String checkOrSavePath) {
        String type;
        DaapConnectorEntity sourceConnector = this.daapConnectorService.getInfo(configModel.getCaptureSource().getConnectorId());
        DaapConnectorEntity sinkConnector = this.daapConnectorService.getInfo(configModel.getCaptureSink().getConnectorId());
        LinkedHashMap<Object, Object> resultMap = new LinkedHashMap();
        if (StringUtils.isNotNull((Object)sinkConnector) && StringUtils.isNotEmpty((String)(type = sinkConnector.getDataSourceType())) && type.contains("CDC")) {
            JSONObject dataSourceProperties = JSON.parseObject((String)sinkConnector.getDataSourceProperties());
            resultMap = this.daapConnectorService.DataTypeChang(dataSourceProperties, type);
            System.out.println(JSON.toJSONString(resultMap));
            sinkConnector.setDataSourceProperties(JSON.toJSONString(resultMap));
        }
        try {
            Object tableSqlModel;
            DaapJobClusterEntity clusterEntity = this.daapJobClusterService.getInfo(configModel.getFlinkResourceId());
            StreamExecutionEnvironment remoteEnvironment = null;
            Configuration config = new Configuration();
            if (StringUtils.isNotEmpty((String)checkOrSavePath)) {
                config.setString("execution.savepoint.path", checkOrSavePath);
            }
            remoteEnvironment = clusterEntity.getType().equals("remote") ? FlinkEnvironmentHandle.createRemoteEnvironment(clusterEntity.getJobManagerHost(), clusterEntity.getPort(), config) : FlinkEnvironmentHandle.createLocalEnvironment();
            if (sourceConnector.getDataSourceType().contains("JDBC") && "BATCH".equals(configModel.getCaptureSource().getReadMethod())) {
                remoteEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
            }
            remoteEnvironment.setParallelism(configModel.getParallelism());
            if (configModel.getCheckpoint().getEnabledMark().equals("1")) {
                remoteEnvironment.enableCheckpointing(configModel.getCheckpoint().getCheckpointingInterval() * 1000L, CheckpointingMode.EXACTLY_ONCE);
                remoteEnvironment.getCheckpointConfig().setCheckpointTimeout(configModel.getCheckpoint().getCheckpointingTimeout() * 1000L);
                remoteEnvironment.getCheckpointConfig().setCheckpointStorage(configModel.getCheckpoint().getCheckpointDirectory());
                remoteEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                remoteEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                remoteEnvironment.setStateBackend((StateBackend)new FsStateBackend(configModel.getCheckpoint().getCheckpointDirectory()));
            }
            if (configModel.getRestartStrategy().getEnabledMark().equals("1")) {
                remoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)configModel.getRestartStrategy().getAttempts(), (Time)Time.seconds((long)configModel.getRestartStrategy().getDelay())));
            }
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)remoteEnvironment);
            Configuration configuration = tableEnv.getConfig().getConfiguration();
            configuration.setString("pipeline.name", configModel.getName());
            List<MappingTableModel> mappingTables = configModel.getMappingTables();
            StreamStatementSet statementSet = tableEnv.createStatementSet();
            ArrayList<Table> resultTables = new ArrayList<Table>();
            if (StringUtils.isNotEmpty((String)configModel.getEtlContent())) {
                for (MappingTableModel mappingTable : mappingTables) {
                    tableSqlModel = this.generateDdl(sourceConnector, sinkConnector, mappingTable);
                    tableEnv.executeSql(((MappingTableSqlModel)tableSqlModel).getSourceTableSql());
                    tableEnv.executeSql(((MappingTableSqlModel)tableSqlModel).getSinkTableSql());
                }
                String insertSql = configModel.getEtlContent();
                String[] inserArray = insertSql.split(";");
                for (String str : inserArray) {
                    if (!StringUtils.isNotEmpty((String)str)) continue;
                    System.out.println(str);
                    statementSet.addInsertSql(str);
                }
            } else {
                for (MappingTableModel mappingTable : mappingTables) {
                    tableSqlModel = this.generateDdl(sourceConnector, sinkConnector, mappingTable);
                    tableEnv.executeSql(((MappingTableSqlModel)tableSqlModel).getSourceTableSql());
                    tableEnv.executeSql(((MappingTableSqlModel)tableSqlModel).getSinkTableSql());
                    statementSet.addInsertSql(((MappingTableSqlModel)tableSqlModel).getInsertSql());
                    if ("BATCH".equals(configModel.getCaptureSource().getReadMethod())) {
                        try {
                            String[] id = new String[]{null};
                            mappingTable.getSourceTable().getSourceFieldModels().forEach(field -> {
                                if (field.getPrimaryKey().equals("1")) {
                                    id[0] = field.getFieldName();
                                }
                            });
                            String[] querySqlArray = ((MappingTableSqlModel)tableSqlModel).getInsertSql().split("sink_" + mappingTable.getSinkTable().getTableName());
                            String querySql = querySqlArray[1].substring(0, querySqlArray[1].indexOf(";"));
                            Table resultTable = tableEnv.sqlQuery(querySql + " ORDER BY " + id[0] + " DESC LIMIT 1");
                            resultTables.add(resultTable);
                        }
                        catch (Exception e) {
                            this.log.error("\u6279\u91cf\u91c7\u96c6\u6570\u636e\uff0c\u83b7\u53d6\u6700\u65b0\u6570\u636e\u5931\u8d25---" + e.getMessage());
                        }
                    }
                    if (!configModel.getAnalogData().getEnabledMark().equals("1")) continue;
                    MappingTableSqlModel genDataSqlModel = this.generateAnalogDataDdl(sourceConnector, sinkConnector, mappingTable, configModel.getAnalogData().getRowsPerSecond());
                    tableEnv.executeSql(genDataSqlModel.getSourceTableSql());
                    tableEnv.executeSql(genDataSqlModel.getSinkTableSql());
                    statementSet.addInsertSql(genDataSqlModel.getInsertSql());
                }
            }
            TableResult result = statementSet.execute();
            Optional jobClient = result.getJobClient();
            if ("BATCH".equals(configModel.getCaptureSource().getReadMethod())) {
                try {
                    for (int i = 0; i < resultTables.size(); ++i) {
                        TableResult tableResult = ((Table)resultTables.get(i)).execute();
                        String[] fieldNames = tableResult.getTableSchema().getFieldNames();
                        CloseableIterator iterator = tableResult.collect();
                        while (iterator.hasNext()) {
                            Row row = (Row)iterator.next();
                            JSONArray array = JSON.parseArray((String)mappingTables.get(i).getSourceTable().getParams());
                            if (array == null || array.size() <= 0 || mappingTables.get(i).getSourceTable().getParams().contains("<") || mappingTables.get(i).getSourceTable().getParams().contains("=")) continue;
                            ArrayList<JSONObject> newParam = new ArrayList<JSONObject>();
                            for (int j = 0; j < array.size(); ++j) {
                                JSONObject jsonObject = array.getJSONObject(j);
                                for (int k = 0; k < fieldNames.length; ++k) {
                                    if (!jsonObject.getString("fieldName").toUpperCase().equals(fieldNames[k].toUpperCase())) continue;
                                    boolean isISO8601 = DaapCaptureConfigServiceImpl.isISO8601DateTime(row.getField(k).toString());
                                    if (isISO8601) {
                                        LocalDateTime dateTime = LocalDateTime.parse(row.getField(k).toString(), DateTimeFormatter.ISO_DATE_TIME);
                                        String formattedDateTime = dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                                        jsonObject.put("criteria", (Object)formattedDateTime);
                                    } else {
                                        jsonObject.put("criteria", (Object)row.getField(k).toString());
                                    }
                                    newParam.add(jsonObject);
                                }
                            }
                            DaapCaptureSourceTableEntity daapCaptureSourceTableEntity = new DaapCaptureSourceTableEntity();
                            daapCaptureSourceTableEntity.setId(mappingTables.get(i).getSourceTable().getId());
                            daapCaptureSourceTableEntity.setParams(((Object)newParam).toString());
                            this.daapCaptureSourceTableService.updateById(daapCaptureSourceTableEntity);
                        }
                    }
                }
                catch (Exception e) {
                    this.log.error("\u66f4\u65b0\u8bb0\u5f55\u6700\u65b0\u6570\u636e\u67e5\u8be2\u6761\u4ef6\u5931\u8d25--" + e.getMessage());
                }
            }
            return ((JobClient)jobClient.get()).getJobID().toString();
        }
        catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public static boolean isISO8601DateTime(String dateTimeString) {
        try {
            LocalDateTime.parse(dateTimeString, DateTimeFormatter.ISO_DATE_TIME);
            return true;
        }
        catch (Exception e) {
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateCreatTable(String tableName, String sinkConnectorStr) throws Exception {
        Connection conn;
        LinkedHashMap<Object, Object> resultMap = new LinkedHashMap();
        JSONObject dataSourceProperties = JSON.parseObject((String)sinkConnectorStr);
        if (dataSourceProperties.getString("connector").toUpperCase().contains("CDC")) {
            resultMap = this.daapConnectorService.DataTypeChang(dataSourceProperties, dataSourceProperties.getString("connector").toUpperCase());
            dataSourceProperties = JSON.parseObject((String)JSON.toJSONString(resultMap));
        }
        if ((conn = JdbcUtils.getConn(dataSourceProperties.getString("username"), dataSourceProperties.getString("password"), dataSourceProperties.getString("url"))) != null) {
            Statement stmt = conn.createStatement();
            StringBuffer sb = new StringBuffer();
            sb.append("ALTER TABLE ").append("`").append(tableName).append("` ADD COLUMN `sync_time`  datetime NULL");
            String sql = sb.toString();
            try {
                stmt.executeUpdate(sql);
            }
            catch (SQLException e) {
                if (e.getMessage().contains("Duplicate")) {
                }
                e.printStackTrace();
                throw new Exception(e);
            }
            finally {
                stmt.close();
                conn.close();
            }
        } else {
            throw new Exception("\u8bf7\u68c0\u67e5\u6570\u636e\u6e90\u94fe\u63a5\u914d\u7f6e");
        }
    }
}

