|
@ -3,18 +3,19 @@ package com.yihu.jw.quota.job;
|
|
|
import com.yihu.jw.entity.ehr.quota.TjQuotaDataSource;
|
|
|
import com.yihu.jw.entity.ehr.quota.TjQuotaLog;
|
|
|
import com.yihu.jw.quota.dao.jpa.TjQuotaLogDao;
|
|
|
import com.yihu.jw.quota.etl.Contant;
|
|
|
import com.yihu.jw.quota.etl.extract.ExtractHelper;
|
|
|
import com.yihu.jw.quota.etl.model.EsConfig;
|
|
|
import com.yihu.jw.quota.etl.save.SaveHelper;
|
|
|
import com.yihu.jw.quota.etl.util.ElasticsearchUtil;
|
|
|
import com.yihu.jw.quota.service.source.TjDataSourceService;
|
|
|
import com.yihu.jw.quota.etl.Contant;
|
|
|
import com.yihu.jw.quota.util.SpringUtil;
|
|
|
import com.yihu.jw.quota.vo.QuotaVo;
|
|
|
import com.yihu.jw.quota.vo.SaveModel;
|
|
|
import com.yihu.jw.util.date.DateUtil;
|
|
|
import net.sf.json.JSONObject;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.index.query.*;
|
|
|
import org.joda.time.DateTime;
|
|
|
import org.joda.time.LocalDate;
|
|
|
import org.quartz.*;
|
|
|
import org.slf4j.Logger;
|
|
@ -28,6 +29,7 @@ import org.springframework.transaction.annotation.Transactional;
|
|
|
import org.springframework.util.StringUtils;
|
|
|
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
|
|
|
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@ -46,6 +48,9 @@ public class EsQuotaJob implements Job {
|
|
|
private String endTime; // 结束时间
|
|
|
private String startTime; //开始时间
|
|
|
private String timeLevel; //时间
|
|
|
private String dataLevel;//1 数据增量 2数据到达量
|
|
|
private String year;//要统计的年份
|
|
|
private Date quotaDate;//统计的时间
|
|
|
private String executeFlag; // 执行动作 1 手动执行 2 周期执行
|
|
|
private int haveThreadCount = 0;//已完成线程数
|
|
|
private int threadCount = 1;//总线程数
|
|
@ -87,7 +92,7 @@ public class EsQuotaJob implements Job {
|
|
|
JSONObject obj = new JSONObject().fromObject(quotaDataSource.getConfigJson());
|
|
|
EsConfig esConfig = (EsConfig) JSONObject.toBean(obj,EsConfig.class);
|
|
|
//查询是否已经统计过,如果已统计 先删除后保存
|
|
|
deleteRecord(quotaVo);
|
|
|
deleteRecord(quotaVo,dataLevel);
|
|
|
if(quotaDataSource.getSourceCode().equals("2") && esConfig.getAggregation()!= null && esConfig.getAggregation().equals("list")) {//来源solr
|
|
|
/* moreThredQuota(tjQuotaLog,esConfig);*/
|
|
|
}else{
|
|
@ -215,7 +220,7 @@ public class EsQuotaJob implements Job {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void deleteRecord(QuotaVo quotaVo) throws Exception {
|
|
|
private void deleteRecord(QuotaVo quotaVo,String dataLevel) throws Exception {
|
|
|
EsConfig esConfig = extractHelper.getEsConfig(quotaVo.getCode());
|
|
|
EsConfig sourceEsConfig = extractHelper.getDataSourceEsConfig(quotaVo.getCode());
|
|
|
|
|
@ -239,6 +244,7 @@ public class EsQuotaJob implements Job {
|
|
|
QueryStringQueryBuilder termQueryQuotaCode = QueryBuilders.queryStringQuery(quotaCodeTerm);
|
|
|
QueryBuilder qb = QueryBuilders.boolQuery()
|
|
|
.must(termQueryQuotaCode)
|
|
|
.must(QueryBuilders.matchQuery("timeLevel", dataLevel))
|
|
|
.must(QueryBuilders.boolQuery()
|
|
|
.should(rangeQuotaTime)
|
|
|
.should(rangeCreateTime)
|
|
@ -271,44 +277,108 @@ public class EsQuotaJob implements Job {
|
|
|
* @return
|
|
|
*/
|
|
|
private List<SaveModel> extract(QuotaVo quotaVo) throws Exception {
|
|
|
return SpringUtil.getBean(ExtractHelper.class).extractData(quotaVo, startTime, endTime, timeLevel, saasid);
|
|
|
return SpringUtil.getBean(ExtractHelper.class).extractData(quotaVo, startTime, endTime, timeLevel,dataLevel, saasid);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 初始化参数
|
|
|
* @param context
|
|
|
*/
|
|
|
private void initParams(JobExecutionContext context) {
|
|
|
private void initParams(JobExecutionContext context) throws Exception{
|
|
|
JobDataMap map = context.getJobDetail().getJobDataMap();
|
|
|
Map<String, Object> params = context.getJobDetail().getJobDataMap();
|
|
|
Object object = map.get("quota");
|
|
|
if (object != null) {
|
|
|
BeanUtils.copyProperties(object, this.quotaVo);
|
|
|
}
|
|
|
|
|
|
this.saasid = map.getString("saasid");
|
|
|
this.dataLevel = map.getString("dataLevel");
|
|
|
// 默认按天,如果指标有配置时间维度,ES抽取过程中维度字典项转换为 SaveModel 时再覆盖。
|
|
|
this.timeLevel = Contant.main_dimension_timeLevel.day;
|
|
|
this.timeLevel = map.get("timeLevel")!=null?String.valueOf(map.get("timeLevel")):Contant.main_dimension_timeLevel.day;
|
|
|
|
|
|
this.executeFlag = map.getString("executeFlag");
|
|
|
if ("2".equals(executeFlag)) {
|
|
|
if (StringUtils.isEmpty(map.getString("startTime"))) {
|
|
|
startTime = Contant.main_dimension_timeLevel.getStartTime(timeLevel);
|
|
|
} else {
|
|
|
this.startTime = map.getString("startTime").split("T")[0] + "T00:00:00Z";
|
|
|
this.startTime = map.getString("startTime").split("T")[0] + " 00:00:00";
|
|
|
}
|
|
|
if (StringUtils.isEmpty(map.getString("endTime"))) {
|
|
|
endTime = LocalDate.now().toString("yyyy-MM-dd'T'00:00:00'Z'");
|
|
|
endTime = LocalDate.now().toString("yyyy-MM-dd 00:00:00");
|
|
|
} else {
|
|
|
this.endTime = map.getString("endTime").split("T")[0] + "T23:59:59Z";
|
|
|
this.endTime = map.getString("endTime").split("T")[0] + " 23:59:59";
|
|
|
}
|
|
|
}else{
|
|
|
if (StringUtils.isEmpty(endTime)) {
|
|
|
endTime = new LocalDate(new DateTime().minusDays(1)).toString("yyyy-MM-dd") + " 23:59:59"; //2017-06-01 17:00:00
|
|
|
} else if (endTime.length()==10){
|
|
|
endTime = endTime + " 23:59:59";
|
|
|
}
|
|
|
if("2".equals(timeLevel)){
|
|
|
endTime = DateUtil.getSundayOfThisWeek(DateUtil.strToDateLong(endTime));
|
|
|
}else if("3".equals(timeLevel)){
|
|
|
endTime= DateUtil.getLastDayOfMonthThisDate((DateUtil.strToDateLong(endTime)));
|
|
|
}else if("4".equals(timeLevel)){
|
|
|
endTime= DateUtil.getLastDayOfYearThisDate((DateUtil.strToDateLong(endTime)));
|
|
|
}
|
|
|
//初始化统计年份
|
|
|
this.year = getNowYearByDate(endTime);
|
|
|
//初始化开始时间
|
|
|
if ("2".equals(dataLevel)) {
|
|
|
//按年度到达量
|
|
|
startTime = this.year + "-01-01 23:59:59";
|
|
|
} else {
|
|
|
//增量
|
|
|
if (StringUtils.isEmpty(startTime)) {
|
|
|
getStartTime();
|
|
|
} else {
|
|
|
startTime = startTime + " 23:59:59";
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
this.quotaDate = DateUtil.strToDate(endTime);
|
|
|
}
|
|
|
|
|
|
@Transactional
|
|
|
TjQuotaLog saveLog(TjQuotaLog tjQuotaLog) {
|
|
|
TjQuotaLog saveLog(TjQuotaLog tjQuotaLog) {
|
|
|
TjQuotaLog log = tjQuotaLogDao.save(tjQuotaLog);
|
|
|
return log;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取现在时间属于那个年度
|
|
|
*
|
|
|
* @param date
|
|
|
* @return
|
|
|
*/
|
|
|
public static String getNowYearByDate(String date) throws Exception {
|
|
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
|
|
|
Date today = simpleDateFormat.parse(date);
|
|
|
String todayString = simpleDateFormat.format(today);
|
|
|
|
|
|
|
|
|
String startDateString = (1900 + today.getYear()) + "-01-01";
|
|
|
Date startDate = simpleDateFormat.parse(startDateString);
|
|
|
if (simpleDateFormat.parse(todayString).after(startDate)) {
|
|
|
return (1900 + today.getYear()) + "";
|
|
|
} else {
|
|
|
return (1900 + today.getYear() - 1) + "";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void getStartTime(){
|
|
|
if("1".equals(this.timeLevel)){//日
|
|
|
startTime = new LocalDate(new DateTime().minusDays(1)).toString("yyyy-MM-dd") + " 00:00:00";
|
|
|
}else if("2".equals(this.timeLevel)){//周
|
|
|
startTime = DateUtil.getMondayOfThisWeek(DateUtil.strToDateLong(endTime))+ " 00:00:00";
|
|
|
}else if("3".equals(this.timeLevel)){//月
|
|
|
startTime =DateUtil.getFristDayOfMonthThisDate(DateUtil.strToDateLong(endTime))+ " 00:00:00";
|
|
|
}else if("4".equals(this.timeLevel)){//年
|
|
|
startTime = DateUtil.getFirstDayOfYear(DateUtil.strToDateLong(endTime))+ " 00:00:00";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 保存数据
|
|
|
*
|