|
@ -1,5 +1,6 @@
|
|
|
package com.yihu.wlyy.statistics.job.business;
|
|
|
|
|
|
import com.yihu.wlyy.statistics.dao.QuartzJobConfigDao;
|
|
|
import com.yihu.wlyy.statistics.dao.QuartzJobLogDao;
|
|
|
import com.yihu.wlyy.statistics.dao.WlyyDimensionQuotaDao;
|
|
|
import com.yihu.wlyy.statistics.etl.cache.Cache;
|
|
@ -10,6 +11,7 @@ import com.yihu.wlyy.statistics.etl.filter.FilterHelper;
|
|
|
import com.yihu.wlyy.statistics.etl.save.SaveHelper;
|
|
|
import com.yihu.wlyy.statistics.etl.save.es.ElasticFactory;
|
|
|
import com.yihu.wlyy.statistics.model.dimension.WlyyDimensionQuota;
|
|
|
import com.yihu.wlyy.statistics.model.job.QuartzJobConfig;
|
|
|
import com.yihu.wlyy.statistics.model.job.QuartzJobLog;
|
|
|
import com.yihu.wlyy.statistics.util.DateUtil;
|
|
|
import com.yihu.wlyy.statistics.util.SpringUtil;
|
|
@ -54,7 +56,8 @@ public class MysqlToEsQuotaJob implements Job {
|
|
|
|
|
|
private Logger logger = LoggerFactory.getLogger(MysqlToEsQuotaJob.class);
|
|
|
|
|
|
private WlyyJobConfigVO wlyyJobConfigVO;//指标对象
|
|
|
private String wlyyJobCongId;//指标对象
|
|
|
private QuartzJobConfig quartzJobConfig;//指标对象
|
|
|
private String endTime;//结束时间
|
|
|
private String startTime;//开始时间
|
|
|
private String year;//要统计的年份
|
|
@ -65,6 +68,8 @@ public class MysqlToEsQuotaJob implements Job {
|
|
|
private QuartzJobLogDao quartzJobLogDao;//执行日志Dao
|
|
|
@Autowired
|
|
|
private WlyyDimensionQuotaDao dimensionQuotaDao;
|
|
|
@Autowired
|
|
|
private QuartzJobConfigDao quartzJobConfigDao;
|
|
|
|
|
|
@Autowired
|
|
|
private ElasticFactory elasticFactory;
|
|
@ -115,7 +120,7 @@ public class MysqlToEsQuotaJob implements Job {
|
|
|
if ("2".equals(timeLevel)) {
|
|
|
//按年度到达量
|
|
|
startTime = this.year + "-06-30 17:00:00";
|
|
|
}else{
|
|
|
} else {
|
|
|
//增量
|
|
|
if (StringUtils.isEmpty(startTime)) {
|
|
|
startTime = new LocalDate(new DateTime().minusDays(2)).toString("yyyy-MM-dd") + " 17:00:00"; //2017-06-01 17:00:00
|
|
@ -126,8 +131,8 @@ public class MysqlToEsQuotaJob implements Job {
|
|
|
|
|
|
|
|
|
this.quotaDate = DateUtil.strToDate(endTime, "yyyy-MM-dd");
|
|
|
|
|
|
this.wlyyJobConfigVO = (WlyyJobConfigVO) map.get("jobConfig");
|
|
|
this.wlyyJobCongId = map.getString("jobConfig");
|
|
|
this.quartzJobConfig=quartzJobConfigDao.findById(wlyyJobCongId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@ -135,19 +140,19 @@ public class MysqlToEsQuotaJob implements Job {
|
|
|
*/
|
|
|
private void computequota() {
|
|
|
try {
|
|
|
logger.info("========================quotaCode:" + wlyyJobConfigVO.getId() + ","+DateUtil.dateToStr(quotaDate, "yyyy-MM-dd")+",timeLevel:"+timeLevel+" start========================");
|
|
|
logger.info("========================quotaCode:" + wlyyJobCongId + "," + DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + ",timeLevel:" + timeLevel + " start========================");
|
|
|
QuartzJobLog tjQuotaLog = new QuartzJobLog();
|
|
|
tjQuotaLog.setJobId(wlyyJobConfigVO.getId());
|
|
|
tjQuotaLog.setJobId(wlyyJobCongId);
|
|
|
tjQuotaLog.setJobStartTime(new Date());
|
|
|
|
|
|
// 0 删除这天的数据
|
|
|
deleteData(quotaDate, wlyyJobConfigVO.getId(), timeLevel);
|
|
|
deleteData(quotaDate, wlyyJobCongId, timeLevel);
|
|
|
//1..抽取数据 如果是累加就是 List<DataModel> 如果是相除 Map<String,List<DataModel>>
|
|
|
List<DataModel> dataModels = extract();
|
|
|
//2..根据规则过滤数据
|
|
|
FilterModel filterModel = filter(dataModels);
|
|
|
//得到该指标的维度
|
|
|
List<WlyyDimensionQuota> dimensionQuotas = dimensionQuotaDao.findDimensionQuotasByQuotaCode(wlyyJobConfigVO.getId());
|
|
|
List<WlyyDimensionQuota> dimensionQuotas = dimensionQuotaDao.findDimensionQuotasByQuotaCode(wlyyJobCongId);
|
|
|
//2.1.从维度的key转换
|
|
|
if (dimensionQuotas != null && dimensionQuotas.size() > 0) {
|
|
|
filterModel = convert(filterModel, dimensionQuotas);
|
|
@ -161,7 +166,7 @@ public class MysqlToEsQuotaJob implements Job {
|
|
|
tjQuotaLog.setJobEndTime(new Date());
|
|
|
tjQuotaLog.setJobContent(JSONArray.fromObject(filterModel.getErrorModels()).toString());
|
|
|
saveLog(tjQuotaLog);
|
|
|
logger.info("========================quotaCode:" + wlyyJobConfigVO.getId() + ","+DateUtil.dateToStr(quotaDate, "yyyy-MM-dd")+" end========================");
|
|
|
logger.info("========================quotaCode:" + wlyyJobCongId + "," + DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + " end========================");
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
@ -241,7 +246,7 @@ public class MysqlToEsQuotaJob implements Job {
|
|
|
*/
|
|
|
private List<SaveModel> compute(List<DataModel> dataModels, List<WlyyDimensionQuota> dimensionQuotas, String timeLevel) {
|
|
|
try {
|
|
|
return SpringUtil.getBean(ComputeHelper.class).compute(dataModels, dimensionQuotas, wlyyJobConfigVO, endTime, timeLevel);
|
|
|
return SpringUtil.getBean(ComputeHelper.class).compute(dataModels, dimensionQuotas, wlyyJobCongId, endTime, timeLevel);
|
|
|
} catch (Exception e) {
|
|
|
logger.error("compute error:" + e.getMessage());
|
|
|
}
|
|
@ -274,21 +279,21 @@ public class MysqlToEsQuotaJob implements Job {
|
|
|
try {
|
|
|
List<DataModel> dataModels = null;
|
|
|
//先判断指标是否支持缓存
|
|
|
if (StringUtils.isEmpty(wlyyJobConfigVO.getCacheKey())) {
|
|
|
if (StringUtils.isEmpty(quartzJobConfig.getCacheKey())) {
|
|
|
//不支持直接去数据库拿
|
|
|
dataModels = SpringUtil.getBean(ExtractHelper.class).extractData(wlyyJobConfigVO, startTime, endTime, year, timeLevel);
|
|
|
dataModels = SpringUtil.getBean(ExtractHelper.class).extractData(quartzJobConfig, startTime, endTime, year, timeLevel);
|
|
|
} else {
|
|
|
//缓存的key 是 时间+timelevel+key
|
|
|
StringBuffer bu = new StringBuffer(DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + "-" + timeLevel + "-" + wlyyJobConfigVO.getCacheKey());
|
|
|
StringBuffer bu = new StringBuffer(DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + "-" + timeLevel + "-" +quartzJobConfig.getCacheKey());
|
|
|
//支持的话判断缓存有没有值
|
|
|
dataModels = Cache.getCache(bu.toString());
|
|
|
if (dataModels == null) {
|
|
|
//如果缓存是空的那么直接数据库拿 在放入缓存
|
|
|
dataModels = SpringUtil.getBean(ExtractHelper.class).extractData(wlyyJobConfigVO, startTime, endTime, year, timeLevel);
|
|
|
dataModels = SpringUtil.getBean(ExtractHelper.class).extractData(quartzJobConfig, startTime, endTime, year, timeLevel);
|
|
|
Cache.addCache(bu.toString(), dataModels);
|
|
|
}
|
|
|
}
|
|
|
logger.info("quotaCode:" + wlyyJobConfigVO.getId() + ",size:" + dataModels.size());
|
|
|
logger.info("quotaCode:" + wlyyJobCongId + ",size:" + dataModels.size());
|
|
|
return dataModels;
|
|
|
} catch (Exception e) {
|
|
|
logger.error("extract error:" + e.getMessage());
|