|
@ -10,6 +10,7 @@ import com.yihu.wlyy.statistics.etl.cache.Cache;
|
|
|
import com.yihu.wlyy.statistics.etl.compute.ComputeHelper;
|
|
|
import com.yihu.wlyy.statistics.etl.convert.ConvertHelper;
|
|
|
import com.yihu.wlyy.statistics.etl.extract.ExtractHelper;
|
|
|
import com.yihu.wlyy.statistics.etl.extract.db.Data2Save;
|
|
|
import com.yihu.wlyy.statistics.etl.filter.FilterHelper;
|
|
|
import com.yihu.wlyy.statistics.etl.save.SaveHelper;
|
|
|
import com.yihu.wlyy.statistics.etl.save.es.ElasticFactory;
|
|
@ -77,6 +78,8 @@ public class CurrentMysqlToEsQuotaJob implements Job {
|
|
|
private QuartzJobConfigDao quartzJobConfigDao;
|
|
|
@Autowired
|
|
|
private StringRedisTemplate redisTemplate;
|
|
|
@Autowired
|
|
|
private Data2Save data2Save;
|
|
|
|
|
|
|
|
|
public void execute(JobExecutionContext context)
|
|
@ -135,31 +138,11 @@ public class CurrentMysqlToEsQuotaJob implements Job {
|
|
|
List<QuartzJobConfig> list = quartzJobConfigDao.findByIds();
|
|
|
list.stream().forEach(one -> {
|
|
|
try {
|
|
|
logger.info("========================quotaCode:" + one.getId() + "," + DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + ",timeLevel:" + timeLevel + " start========================");
|
|
|
QuartzJobLog tjQuotaLog = new QuartzJobLog();
|
|
|
tjQuotaLog.setJobId(one.getId());
|
|
|
tjQuotaLog.setJobStartTime(new Date());
|
|
|
|
|
|
//1..抽取数据 如果是累加就是 List<DataModel> 如果是相除 Map<String,List<DataModel>>
|
|
|
List<DataModel> dataModels = extract(one);
|
|
|
//2..根据规则过滤数据
|
|
|
FilterModel filterModel = filter(dataModels);
|
|
|
//得到改指标的维度
|
|
|
List<WlyyDimensionQuota> dimensionQuotas = dimensionQuotaDao.findDimensionQuotasByQuotaCode(one.getId());
|
|
|
//2.1.从维度的key转换
|
|
|
if (dimensionQuotas != null && dimensionQuotas.size() > 0) {
|
|
|
filterModel = convert(filterModel, dimensionQuotas);
|
|
|
if (StringUtils.isEmpty(one.getExtractType()) || "1".equals(one.getExtractType())){
|
|
|
mysql(one);
|
|
|
}else if("2".equals(one.getExtractType())){
|
|
|
es(one);
|
|
|
}
|
|
|
//3.统计数据
|
|
|
List<SaveModel> sms = compute(filterModel.getData(), dimensionQuotas, timeLevel, one);
|
|
|
//4.更新数据
|
|
|
Boolean success = updateData(sms, quotaDate, one.getId(), timeLevel, dimensionQuotas.size());
|
|
|
|
|
|
tjQuotaLog.setJobType(success ? "1" : "0");
|
|
|
tjQuotaLog.setJobEndTime(new Date());
|
|
|
tjQuotaLog.setJobContent(JSONArray.fromObject(filterModel.getErrorModels()).toString());
|
|
|
saveLog(tjQuotaLog);
|
|
|
logger.info("========================quotaCode:" + one.getId() + "," + DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + " end========================");
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
@ -169,15 +152,89 @@ public class CurrentMysqlToEsQuotaJob implements Job {
|
|
|
|
|
|
}
|
|
|
|
|
|
private void es(QuartzJobConfig one) throws Exception{
|
|
|
this.endTime = DateUtil.getStringDate("yyyy-MM-dd'T'HH:mm:ssZZZ");
|
|
|
//初始化统计年份
|
|
|
this.year = getNowYearByDate();
|
|
|
//初始化开始时间
|
|
|
if ("2".equals(timeLevel)) {
|
|
|
//按年度到达量
|
|
|
startTime = this.year + "-06-30T17:00:00+0800";
|
|
|
} else {
|
|
|
//增量
|
|
|
this.startTime = new LocalDate(new DateTime().minusDays(1)).toString("yyyy-MM-dd") + "T17:00:00+0800"; //2017-06-01 17:00:00
|
|
|
}
|
|
|
this.quotaDate = DateUtil.strToDate(endTime, "yyyy-MM-dd");
|
|
|
|
|
|
|
|
|
one.setStartTime(startTime);
|
|
|
one.setEndTime(endTime);
|
|
|
logger.info("========================quotaCode:" + one.getQuotaId() + "," + DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + ",timeLevel:" + timeLevel + " start========================");
|
|
|
QuartzJobLog tjQuotaLog = new QuartzJobLog();
|
|
|
tjQuotaLog.setJobId( one.getQuotaId());
|
|
|
tjQuotaLog.setJobStartTime(new Date());
|
|
|
// 1..抽取数据 如果是累加就是 List<DataModel> 如果是相除 Map<String,List<DataModel>>
|
|
|
List<DataModel> dataModels = extract2Es(one);
|
|
|
// 2 DataModel 转SaveModel即可
|
|
|
List<SaveModel> saveModels = data2Save.data2save(dataModels,one,quotaDate,timeLevel);
|
|
|
// 3.更新数据
|
|
|
Boolean success = updateData(saveModels, quotaDate, one.getId(), timeLevel);
|
|
|
|
|
|
tjQuotaLog.setJobType(success ? "1" : "0");
|
|
|
tjQuotaLog.setJobEndTime(new Date());
|
|
|
//tjQuotaLog.setJobContent(JSONArray.fromObject(filterModel.getErrorModels()).toString());
|
|
|
saveLog(tjQuotaLog);
|
|
|
logger.info("========================quotaCode:" + one.getQuotaId() + "," + DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + " end========================");
|
|
|
|
|
|
}
|
|
|
|
|
|
private void mysql(QuartzJobConfig one) {
|
|
|
logger.info("========================quotaCode:" + one.getId() + "," + DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + ",timeLevel:" + timeLevel + " start========================");
|
|
|
QuartzJobLog tjQuotaLog = new QuartzJobLog();
|
|
|
tjQuotaLog.setJobId(one.getId());
|
|
|
tjQuotaLog.setJobStartTime(new Date());
|
|
|
|
|
|
//1..抽取数据 如果是累加就是 List<DataModel> 如果是相除 Map<String,List<DataModel>>
|
|
|
List<DataModel> dataModels = extract2Es(one);
|
|
|
//2..根据规则过滤数据
|
|
|
FilterModel filterModel = filter(dataModels);
|
|
|
//得到改指标的维度
|
|
|
List<WlyyDimensionQuota> dimensionQuotas = dimensionQuotaDao.findDimensionQuotasByQuotaCode(one.getId());
|
|
|
//2.1.从维度的key转换
|
|
|
if (dimensionQuotas != null && dimensionQuotas.size() > 0) {
|
|
|
filterModel = convert(filterModel, dimensionQuotas);
|
|
|
}
|
|
|
//3.统计数据
|
|
|
List<SaveModel> sms = compute(filterModel.getData(), dimensionQuotas, timeLevel, one);
|
|
|
//4.更新数据
|
|
|
Boolean success = updateData(sms, quotaDate, one.getId(), timeLevel);
|
|
|
|
|
|
tjQuotaLog.setJobType(success ? "1" : "0");
|
|
|
tjQuotaLog.setJobEndTime(new Date());
|
|
|
tjQuotaLog.setJobContent(JSONArray.fromObject(filterModel.getErrorModels()).toString());
|
|
|
saveLog(tjQuotaLog);
|
|
|
logger.info("========================quotaCode:" + one.getId() + "," + DateUtil.dateToStr(quotaDate, "yyyy-MM-dd") + " end========================");
|
|
|
}
|
|
|
|
|
|
private List<DataModel> extract2Es(QuartzJobConfig one) {
|
|
|
try {
|
|
|
List<DataModel> dataModels = SpringUtil.getBean(ExtractHelper.class).extractData(one, startTime, endTime, year, timeLevel);
|
|
|
logger.info("quotaCode:" + one.getQuotaId() + ",size:" + dataModels.size());
|
|
|
return dataModels;
|
|
|
} catch (Exception e) {
|
|
|
logger.error("extract error:" + e.getMessage());
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 删除 某个指标某一天的某个timelevel的数据
|
|
|
*
|
|
|
* @param quotaDate
|
|
|
* @param quotaCode
|
|
|
* @param timeLevel
|
|
|
* @param size
|
|
|
*/
|
|
|
private boolean updateData(List<SaveModel> sms, Date quotaDate, String quotaCode, String timeLevel, int size) {
|
|
|
private boolean updateData(List<SaveModel> sms, Date quotaDate, String quotaCode, String timeLevel) {
|
|
|
JestClient jestClient = null;
|
|
|
try {
|
|
|
jestClient = elasticFactory.getJestClient();
|
|
@ -345,4 +402,9 @@ public class CurrentMysqlToEsQuotaJob implements Job {
|
|
|
return (1900 + today.getYear() - 1) + "";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
SimpleDateFormat s=new SimpleDateFormat("");
|
|
|
System.out.println(s.format(new Date()));
|
|
|
}
|
|
|
}
|