浏览代码

数据解析 增加 数据关联查询

jkzlzhoujie 6 年之前
父节点
当前提交
46652e8257
共有 1 个文件被更改,包括 21 次插入5 次删除
  1. 21 5
      src/main/java/com/yihu/quota/service/job/SingleTableJob.java

+ 21 - 5
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -91,6 +91,8 @@ public class SingleTableJob implements Job {
     */
    protected String cubeId;
    protected String initializeType;//初始化方式 全表数据 table  列数据 cloumn
    @Autowired
    private Producer producer;
    @Autowired
@ -102,8 +104,12 @@ public class SingleTableJob implements Job {
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        prepare(jobExecutionContext);
        cleanData();
        if(initializeType.equals("table")){
            boolean cleanFlag =  cleanData();
            if( !cleanFlag ){
                return;
            }
        }
        String sql = sqlGenerate();
        String[] countSql = sql.split("from");
        sql = "select count(*) from " + countSql[1];
@ -146,6 +152,8 @@ public class SingleTableJob implements Job {
        SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        initializeType = jobDataMap.getString("initializeType");
        database = jobDataMap.getString("database");
        table = jobDataMap.getString("table");
        primeKey = jobDataMap.getString("primeKey");
@ -163,7 +171,10 @@ public class SingleTableJob implements Job {
        cubeId = jobDataMap.getString("cubeId");
    }
    private void cleanData() {
    /**
     * 清空数据 发送消息
     */
    private boolean cleanData() {
        if (JobConstant.ExecType.Full.equals(execType)) {
            Map<String, Object> dataMap = new HashMap<>(2);
            dataMap.put("dataSource", "mysql");
@ -175,11 +186,13 @@ public class SingleTableJob implements Job {
            try {
                String jsonData = objectMapper.writeValueAsString(dataMap);
                logger.info("清除消息:{}",jsonData);
                producer.sendMessage(Producer.sepTopic, jsonData);
               return producer.sendMessage(Producer.sepTopic, jsonData);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return  false;
            }
        }
        return  true;
    }
    private void saveData(List<Map<String, Object>> list) {
@ -204,7 +217,10 @@ public class SingleTableJob implements Job {
                String jsonData = objectMapper.writeValueAsString(dataMap);
                Thread.sleep(50);
                logger.info("清除消息:{}",jsonData);
                producer.sendMessage(Producer.sepTopic, jsonData);
                boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
                if( !sendFlag ){
                    return;
                }
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {