Sfoglia il codice sorgente

Merge branch 'dev-1.13.0' of http://192.168.1.220:10080/EHR/svr-quota into dev-1.13.0

jkzlzhoujie 6 anni fa
parent
commit
833a3a8202

+ 27 - 0
src/main/java/com/yihu/quota/service/cube/ElasticSearchDataProcessService.java

@ -40,6 +40,7 @@ public class ElasticSearchDataProcessService {
    private static String rowKey_k = "rowkey";
    private static String profileId_k = "profile_id";
    private static String action_k = "action";
    private static String action_delAll = "delAll";
    @Autowired
    private JdbcBasicService jdbcBasicService;
@ -51,6 +52,8 @@ public class ElasticSearchDataProcessService {
    private DataSourcesTableFieldService dataSourcesTableFieldService;
    @Autowired
    private ElasticSearchUtil elasticSearchUtil;
    @Autowired
    private CubeService cubeService;
    /**
     *
@ -188,6 +191,17 @@ public class ElasticSearchDataProcessService {
                }else {
                    throw new Exception("视图,表不存在");
                }
            } else if (action.contains(action_delAll)) {
                String cubeId = "";
                if (dataMap.containsKey("cubeId")) {
                    cubeId = dataMap.remove("cubeId").toString();
                    String index = this.getIndexTypeById(cubeId);
                    try {
                        elasticSearchUtil.remove(index);
                    } catch (Exception e) {
                        throw new  Exception("索引不存在");
                    }
                }
            }
        }catch (ParseException e){
            logger.debug("elasticSearch 执行失败");
@ -465,4 +479,17 @@ public class ElasticSearchDataProcessService {
        return "";
    }
    /**
     * 根据数据集id获取索引名和索引类型
     * @param cubeId
     * @return
     */
    public String getIndexTypeById(String cubeId) {
        String index = "";
        Cube cube = cubeService.findOne(Integer.parseInt(cubeId));
        if (null != cube) {
            index = cube.getIndexName();
        }
        return index;
    }
}

+ 66 - 10
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -1,10 +1,10 @@
package com.yihu.quota.service.job;
import com.google.gson.Gson;
import com.yihu.ehr.util.datetime.DateUtil;
import com.yihu.quota.contants.JobConstant;
import com.yihu.quota.kafka.Producer;
import com.yihu.quota.util.sql.DbKit;
import org.apache.commons.lang.StringUtils;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -14,7 +14,6 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -36,6 +35,11 @@ import java.util.Map;
public class SingleTableJob implements Job {
    static private Logger logger = LoggerFactory.getLogger(SingleTableJob.class);
    /**
     * 数据来源库
     */
    protected String database;
    /**
     * 数据来源表
     */
@ -75,6 +79,16 @@ public class SingleTableJob implements Job {
     */
    protected JobConstant.ExecType execType;
    /**
     * 查询列
     */
    protected String searchColumn;
    /**
     * 数据集id
     */
    protected String cubeId;
    @Autowired
    private Producer producer;
    @Autowired
@ -87,11 +101,8 @@ public class SingleTableJob implements Job {
        cleanData();
        List<Map<String, Object>> list;
        do {
            list = fetch();
            saveData(list);
        } while (list != null && list.size() != 0);
        list = fetch();
        saveData(list);
    }
    private void prepare(JobExecutionContext jobExecutionContext) {
@ -99,6 +110,7 @@ public class SingleTableJob implements Job {
        SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        database = jobDataMap.getString("database");
        table = jobDataMap.getString("table");
        primeKey = jobDataMap.getString("primeKey");
        filterField = jobDataMap.getString("filterField");
@ -106,16 +118,22 @@ public class SingleTableJob implements Job {
        size = jobDataMap.getString("size");
        start = jobDataMap.getString("start");
        end = jobDataMap.getString("end");
        execType = JobConstant.ExecType.fromInt(jobDataMap.getIntValue("execType"));
        searchColumn = jobDataMap.getString("searchColumn");
        cubeId = jobDataMap.getString("cubeId");
    }
    private void cleanData() {
        if (JobConstant.ExecType.Full.equals(execType)) {
            Map<String, Object> dataMap = new HashMap<>(2);
            dataMap.put("database", database);
            dataMap.put("table", table);
            dataMap.put("delAll", true);
            dataMap.put("action", "delAll");
            dataMap.put("cubeId", cubeId);
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            logger.info("清除消息:{}",jsonData);
            producer.sendMessage(Producer.sepTopic, jsonData);
        }
    }
@ -128,6 +146,7 @@ public class SingleTableJob implements Job {
        list.forEach(item -> {
            Map<String, Object> dataMap = new HashMap<>(item.size());
            dataMap.put("database", database);
            dataMap.put("table", table);
            item.forEach((key, value) -> {
                if (key.equals(primeKey)) {
@ -140,18 +159,55 @@ public class SingleTableJob implements Job {
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            logger.info("保存消息:{}",jsonData);
            producer.sendMessage(Producer.sepTopic, jsonData);
        });
    }
    private List<Map<String, Object>> fetch() {
        StringBuilder sb = new StringBuilder();
        if (StringUtils.isNotEmpty(searchColumn)) {
            sb.append("select ").append(primeKey).append(",").append(searchColumn).append(" from ").append(database).append(".").append(table);
        } else {
            sb.append("select * from ").append(database).append(".").append(table);
        }
        if (StringUtils.isNotEmpty(filterField) && (StringUtils.isNotEmpty(start) || StringUtils.isNotEmpty(end))) {
            sb.append(" where ");
            if ("number".equals(filterFieldType)) {
                if (StringUtils.isNotEmpty(start) && StringUtils.isNotEmpty(end)) {
                    sb.append(filterField).append(">=").append(start).append(" and ").append(filterField).append("<=").append(end);
                } else if (StringUtils.isNotEmpty(start) && StringUtils.isEmpty(end)) {
                    sb.append(filterField).append(">=").append(start);
                } else if (StringUtils.isEmpty(start) && StringUtils.isNotEmpty(end)) {
                    sb.append(filterField).append("<=").append(end);
                }
            } else if ("date".equals(filterFieldType)) {
                if (StringUtils.isNotEmpty(start) && StringUtils.isNotEmpty(end)) {
                    sb.append(filterField).append(">=").append(DbKit.use().getLongDate(start)).append(" and ")
                            .append(filterField).append("<=").append(DbKit.use().getLongDate(end));
                } else if (StringUtils.isNotEmpty(start) && StringUtils.isEmpty(end)) {
                    sb.append(filterField).append(">=").append(DbKit.use().getLongDate(start));
                } else if (StringUtils.isEmpty(start) && StringUtils.isNotEmpty(end)) {
                    sb.append(filterField).append("<=").append(DbKit.use().getLongDate(end));
                }
            } else {
                logger.warn("不支持的过滤字段类型");
                return null;
            }
        }
        logger.info("sql={}",sb.toString());
        return jdbcTemplate.queryForList(sb.toString());
    }
    /**
     * TODO:没有设置数据库来源和数据库类型,当前使用默认数据库
     *
     * @return
     */
    private List<Map<String, Object>> fetch() {
    /*private List<Map<String, Object>> fetch() {
        String sql = "";
        if ("number".equals(filterFieldType)) {
            Long lngTemp = Long.parseLong(start) + Long.parseLong(size);
@ -181,5 +237,5 @@ public class SingleTableJob implements Job {
        }
        return jdbcTemplate.queryForList(sql);
    }
    }*/
}