package com.yihu.quota.service.job;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.yihu.quota.contants.JobConstant;
import com.yihu.quota.kafka.Producer;
import com.yihu.quota.util.sql.DbKit;
import org.apache.commons.lang.StringUtils;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 用于一个表就是一个多维数据集的情况,如组织机构表的数据采集
* 数据采集表要求,不符合要求的表需要先改造后进行数据采集:
*
* 表必须是单字段唯一键(或主键),不支持复合唯一键(或主键)
* 过滤字段只支持单字段,不支持多字段过滤
* 过滤字段只支持时间和数字字段,不支持其他类型字段
*
* @author l4qiang
* @date 2018-09-18
*/
@Component
@Scope("prototype")
@DisallowConcurrentExecution
public class SingleTableJob implements Job {
static private Logger logger = LoggerFactory.getLogger(SingleTableJob.class);
/**
* 数据来源库
*/
protected String database;
/**
* 数据来源表
*/
protected String table;
/**
* 表主键
*/
protected String primeKey;
/**
* 过滤字段
*/
protected String filterField;
/**
* 过滤字段类型
*/
protected String filterFieldType;
/**
* 过滤数据步长
*/
protected String size;
/**
* 开始时间
*/
protected String start;
/**
* 结束时间
*/
protected String end;
/**
* 执行动作 1 手动执行 2 周期执行
*/
protected JobConstant.ExecType execType;
/**
* 查询列
*/
protected String searchColumn;
/**
* 数据集id
*/
protected String cubeId;
protected String initializeType;//初始化方式 全表数据 table 列数据 cloumn
@Autowired
private Producer producer;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ObjectMapper objectMapper;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
prepare(jobExecutionContext);
if(initializeType.equals("table")){
boolean cleanFlag = cleanData();
if( !cleanFlag ){
return;
}
}
String sql = sqlGenerate();
String[] countSql = sql.split("from");
sql = "select count(*) from " + countSql[1];
try {
int rows = jdbcTemplate.queryForObject(sql, Integer.class);
int perCount = 10000;
if (rows > perCount) {
int count = rows / perCount;
int remainder = rows % perCount;
if (remainder != 0) {
count++;
} else {
remainder = perCount;
}
for (int i = 0; i < count; i++) {
int row,start = 0;
if (i != 0) {
start = i * perCount;
}
// 确定抽取多少条数据
if (i + 1 == count) {
row = remainder;
} else {
row = perCount;
}
List