123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- package com.yihu.quota.service.job;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.google.gson.Gson;
- import com.yihu.quota.contants.JobConstant;
- import com.yihu.quota.kafka.Producer;
- import com.yihu.quota.util.sql.DbKit;
- import org.apache.commons.lang.StringUtils;
- import org.quartz.*;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Scope;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.stereotype.Component;
- import org.springframework.web.context.support.SpringBeanAutowiringSupport;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- /**
- * 用于一个表就是一个多维数据集的情况,如组织机构表的数据采集
- * 数据采集表要求,不符合要求的表需要先改造后进行数据采集:
- * <p>
- * 表必须是单字段唯一键(或主键),不支持复合唯一键(或主键)
- * 过滤字段只支持单字段,不支持多字段过滤
- * 过滤字段只支持时间和数字字段,不支持其他类型字段
- *
- * @author l4qiang
- * @date 2018-09-18
- */
- @Component
- @Scope("prototype")
- @DisallowConcurrentExecution
- public class SingleTableJob implements Job {
- static private Logger logger = LoggerFactory.getLogger(SingleTableJob.class);
- /**
- * 数据来源库
- */
- protected String database;
- /**
- * 数据来源表
- */
- protected String table;
- /**
- * 表主键
- */
- protected String primeKey;
- /**
- * 过滤字段
- */
- protected String filterField;
- /**
- * 过滤字段类型
- */
- protected String filterFieldType;
- /**
- * 过滤数据步长
- */
- protected String size;
- /**
- * 开始时间
- */
- protected String start;
- /**
- * 结束时间
- */
- protected String end;
- /**
- * 执行动作 1 手动执行 2 周期执行
- */
- protected JobConstant.ExecType execType;
- /**
- * 查询列
- */
- protected String searchColumn;
- /**
- * 数据集id
- */
- protected String cubeId;
- protected String initializeType;//初始化方式 全表数据 table 列数据 cloumn
- @Autowired
- private Producer producer;
- @Autowired
- private JdbcTemplate jdbcTemplate;
- @Autowired
- private ObjectMapper objectMapper;
- @Override
- public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- prepare(jobExecutionContext);
- if(initializeType.equals("table")){
- boolean cleanFlag = cleanData();
- if( !cleanFlag ){
- return;
- }
- }
- String sql = sqlGenerate();
- String[] countSql = sql.split("from");
- sql = "select count(*) from " + countSql[1];
- try {
- int rows = jdbcTemplate.queryForObject(sql, Integer.class);
- int perCount = 10000;
- if (rows > perCount) {
- int count = rows / perCount;
- int remainder = rows % perCount;
- if (remainder != 0) {
- count++;
- } else {
- remainder = perCount;
- }
- for (int i = 0; i < count; i++) {
- int row,start = 0;
- if (i != 0) {
- start = i * perCount;
- }
- // 确定抽取多少条数据
- if (i + 1 == count) {
- row = remainder;
- } else {
- row = perCount;
- }
- List<Map<String, Object>> list = fetch(start, row);
- saveData(list);
- }
- } else {
- List<Map<String, Object>> list = fetch(0, perCount);
- saveData(list);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- private void prepare(JobExecutionContext jobExecutionContext) {
- //spring注入
- SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
- JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
- initializeType = jobDataMap.getString("initializeType");
- database = jobDataMap.getString("database");
- table = jobDataMap.getString("table");
- primeKey = jobDataMap.getString("primeKey");
- filterField = jobDataMap.getString("filterField");
- filterFieldType = jobDataMap.getString("filterFieldType");
- size = jobDataMap.getString("size");
- start = jobDataMap.getString("start");
- end = jobDataMap.getString("end");
- if(jobDataMap.get("execType") != null){
- execType = JobConstant.ExecType.fromInt(jobDataMap.getIntValue("execType"));
- }else {
- execType = JobConstant.ExecType.Full;
- }
- searchColumn = jobDataMap.getString("searchColumn");
- cubeId = jobDataMap.getString("cubeId");
- }
- /**
- * 清空数据 发送消息
- */
- private boolean cleanData() {
- if (JobConstant.ExecType.Full.equals(execType)) {
- Map<String, Object> dataMap = new HashMap<>(2);
- dataMap.put("dataSource", "mysql");
- dataMap.put("database", database);
- dataMap.put("table", table);
- dataMap.put("action", "DelAll");
- dataMap.put("cubeId", cubeId);
- try {
- String jsonData = objectMapper.writeValueAsString(dataMap);
- logger.info("清除消息:{}",jsonData);
- return producer.sendMessage(Producer.sepTopic, jsonData);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- return false;
- }
- }
- return true;
- }
- private void saveData(List<Map<String, Object>> list) {
- if (list == null) {
- logger.warn("未获取到数据");
- return;
- }
- Map<String, Object> dataMap = new HashMap<>();
- dataMap.put("database", database);
- dataMap.put("dataSource", "mysql");
- dataMap.put("action", "PutAll");
- dataMap.put("table", table);
- dataMap.put("cubeId", cubeId);
- List<Map<String,Object>> dataList = new ArrayList<>();
- int p = 1;
- int perCount = 200;
- int d = list.size()/perCount;
- int y = list.size()%perCount;
- for(int i = 0; i < list.size() ; i++){
- Map<String,Object> map = new HashMap<>();
- Map<String,Object> item = list.get(i);
- item.forEach((key, value) -> {
- if (key.equals(primeKey)) {
- map.put("rowkey", value);
- }
- map.put(key, value);
- });
- dataList.add(map);
- if(list.size() < perCount){
- dataMap.put("dataList", dataList);
- sendDataMessage(dataMap);
- dataList.clear();
- }else {
- if((i+1) == perCount*p){
- p++;
- dataMap.put("dataList", dataList);
- sendDataMessage(dataMap);
- dataList.clear();
- }else{
- //有余数时,最后一组数据
- if(d > 0 && y > 0 && i==list.size()-1){
- dataMap.put("dataList", dataList);
- sendDataMessage(dataMap);
- dataList.clear();
- }
- }
- }
- }
- }
- private boolean sendDataMessage(Map<String,Object> dataMap){
- try {
- String jsonData = objectMapper.writeValueAsString(dataMap);
- Thread.sleep(50);
- // logger.info("消息:{}",jsonData);
- return producer.sendMessage(Producer.sepTopic, jsonData);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return false;
- }
- private List<Map<String, Object>> fetch(Integer start, Integer row) {
- String sql = sqlGenerate();
- sql += " limit " + start + "," + row;
- logger.info("sql={}",sql);
- return jdbcTemplate.queryForList(sql);
- }
- public String sqlGenerate() {
- StringBuilder sb = new StringBuilder();
- if (StringUtils.isNotEmpty(searchColumn)) {
- sb.append("select ").append(primeKey).append(",").append(searchColumn).append(" from ").append(database).append(".").append(table);
- } else {
- sb.append("select * from ").append(database).append(".").append(table);
- }
- if (StringUtils.isNotEmpty(filterField) && (StringUtils.isNotEmpty(start) || StringUtils.isNotEmpty(end))) {
- sb.append(" where ");
- if ("number".equals(filterFieldType)) {
- if (StringUtils.isNotEmpty(start) && StringUtils.isNotEmpty(end)) {
- sb.append(filterField).append(">=").append(start).append(" and ").append(filterField).append("<=").append(end);
- } else if (StringUtils.isNotEmpty(start) && StringUtils.isEmpty(end)) {
- sb.append(filterField).append(">=").append(start);
- } else if (StringUtils.isEmpty(start) && StringUtils.isNotEmpty(end)) {
- sb.append(filterField).append("<=").append(end);
- }
- } else if ("date".equals(filterFieldType)) {
- if (StringUtils.isNotEmpty(start) && StringUtils.isNotEmpty(end)) {
- sb.append(filterField).append(">=").append(DbKit.use().getLongDate(start)).append(" and ")
- .append(filterField).append("<=").append(DbKit.use().getLongDate(end));
- } else if (StringUtils.isNotEmpty(start) && StringUtils.isEmpty(end)) {
- sb.append(filterField).append(">=").append(DbKit.use().getLongDate(start));
- } else if (StringUtils.isEmpty(start) && StringUtils.isNotEmpty(end)) {
- sb.append(filterField).append("<=").append(DbKit.use().getLongDate(end));
- }
- } else {
- logger.warn("不支持的过滤字段类型");
- return null;
- }
- }
- return sb.toString();
- }
- /**
- * TODO:没有设置数据库来源和数据库类型,当前使用默认数据库
- *
- * @return
- */
- /*private List<Map<String, Object>> fetch() {
- String sql = "";
- if ("number".equals(filterFieldType)) {
- Long lngTemp = Long.parseLong(start) + Long.parseLong(size);
- String temp = lngTemp.toString();
- sql = "select * from " + table +
- " where " + filterField + ">=" + start + " and " + filterField + "<" + temp + " and " +
- filterField + "<=" + end;
- start = temp;
- } else if ("date".equals(filterFieldType)) {
- Date date = DateUtil.toDateFromTime(start);
- java.util.Calendar calendar = java.util.Calendar.getInstance();
- calendar.setTime(date);
- calendar.add(java.util.Calendar.HOUR_OF_DAY, Integer.parseInt(size));
- String temp = DateUtil.toString(calendar.getTime(), DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
- sql = "select * from " + table +
- " where " + filterField + ">=" + DbKit.use().getLongDate(start) + " and " + filterField + "<" + DbKit.use().getLongDate(temp) + " and " +
- filterField + "<=" + DbKit.use().getLongDate(end);
- start = temp;
- } else {
- logger.warn("不支持的过滤字段类型");
- return null;
- }
- return jdbcTemplate.queryForList(sql);
- }*/
- }
|