123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package com.yihu.quota.job;
- import com.yihu.quota.dao.TjQuotaLogDao;
- import com.yihu.quota.etl.Contant;
- import com.yihu.quota.etl.extract.ExtractHelper;
- import com.yihu.quota.etl.extract.ExtractPercentHelper;
- import com.yihu.quota.etl.model.EsConfig;
- import com.yihu.quota.etl.save.SaveHelper;
- import com.yihu.quota.model.TjQuotaLog;
- import com.yihu.quota.util.ElasticsearchUtil;
- import com.yihu.quota.util.SpringUtil;
- import com.yihu.quota.vo.QuotaVo;
- import com.yihu.quota.vo.SaveModel;
- import org.elasticsearch.client.Client;
- import org.elasticsearch.index.query.BoolQueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.index.query.QueryStringQueryBuilder;
- import org.elasticsearch.index.query.RangeQueryBuilder;
- import org.joda.time.LocalDate;
- import org.quartz.*;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.BeanUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.util.StringUtils;
- import org.springframework.web.context.support.SpringBeanAutowiringSupport;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- /**
- * @author janseny
- * @date 2017/8/22
- * DisallowConcurrentExecution 防止到了执行时间点前一任务还在执行中,但是这时有空闲的线程,那么马上又会执行,这样一来就会存在同一job被并行执行
- */
- @Component
- @Scope("prototype")
- @DisallowConcurrentExecution
- public class EsQuotaPercentJob implements Job {
- @Autowired
- ElasticsearchUtil elasticsearchUtil;
- private Logger logger = LoggerFactory.getLogger(EsQuotaPercentJob.class);
- private QuotaVo quotaVo = new QuotaVo();//指标对象
- private String endTime;//结束时间
- private String startTime;//开始时间
- private String timeLevel;//时间
- private String molecular;//分子
- private String denominator;//分母
- @Autowired
- private TjQuotaLogDao tjQuotaLogDao;
- @Autowired
- private ElasticsearchUtil esClientUtil;
- @Autowired
- private ExtractHelper extractHelper;
- @Override
- public void execute(JobExecutionContext context) throws JobExecutionException {
- try {
- //springz注入
- SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
- //初始化参数
- initParams(context);
- //统计
- quota();
- } catch (Exception e) {
- //如果出錯立即重新執行
- JobExecutionException e2 = new JobExecutionException(e);
- e2.setRefireImmediately(true);
- e.printStackTrace();
- }
- }
- /**
- * 统计过程
- */
- private void quota() {
- TjQuotaLog tjQuotaLog = new TjQuotaLog();
- tjQuotaLog.setQuotaCode(quotaVo.getCode());
- tjQuotaLog.setStartTime(new Date());
- String message = "";
- try {
- //抽取数据计算数据
- List<SaveModel> dataModels = extract();
- if (dataModels != null && dataModels.size() > 0) {
- //查询是否已经统计过,如果已统计 先删除后保存
- EsConfig esConfig = extractHelper.getEsConfig(quotaVo.getCode());
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- QueryStringQueryBuilder termQueryQuotaCode = QueryBuilders.queryStringQuery("quotaCode:" + quotaVo.getCode().replaceAll("_", ""));
- boolQueryBuilder.must(termQueryQuotaCode);
- if (!StringUtils.isEmpty(startTime)) {
- RangeQueryBuilder rangeQueryStartTime = QueryBuilders.rangeQuery("quotaDate").gte(startTime);
- boolQueryBuilder.must(rangeQueryStartTime);
- }
- if (!StringUtils.isEmpty(endTime)) {
- RangeQueryBuilder rangeQueryEndTime = QueryBuilders.rangeQuery("quotaDate").lte(endTime);
- boolQueryBuilder.must(rangeQueryEndTime);
- }
- Client client = esClientUtil.getClient(esConfig.getHost(), esConfig.getPort(), esConfig.getClusterName());
- try {
- elasticsearchUtil.queryDelete(client, esConfig.getIndex(), esConfig.getType(), boolQueryBuilder);
- } catch (Exception e) {
- e.getMessage();
- } finally {
- client.close();
- }
- List<SaveModel> dataSaveModels = new ArrayList<>();
- for (SaveModel saveModel : dataModels) {
- if (saveModel.getResult() != null) {
- dataSaveModels.add(saveModel);
- }
- }
- //保存数据
- Boolean success = saveDate(dataSaveModels);
- tjQuotaLog.setStatus(success ? Contant.save_status.success : Contant.save_status.fail);
- tjQuotaLog.setContent(success ? "统计保存成功" : "统计数据ElasticSearch保存失败");
- } else {
- tjQuotaLog.setStatus(Contant.save_status.fail);
- tjQuotaLog.setContent("没有抽取到数据");
- }
- } catch (Exception e) {
- logger.error(e.getMessage());
- message = e.getMessage();
- tjQuotaLog.setStatus(Contant.save_status.fail);
- tjQuotaLog.setContent(message);
- }
- tjQuotaLog.setEndTime(new Date());
- saveLog(tjQuotaLog);
- }
- /**
- * 抽取数据
- *
- * @return
- */
- private List<SaveModel> extract() throws Exception {
- return SpringUtil.getBean(ExtractPercentHelper.class).extractData(quotaVo, startTime, endTime, timeLevel);
- }
- /**
- * 初始化参数
- *
- * @param context
- */
- private void initParams(JobExecutionContext context) {
- JobDataMap map = context.getJobDetail().getJobDataMap();
- this.molecular = map.getString("molecular");
- this.denominator = map.getString("denominator");
- this.endTime = map.getString("endTime");
- if (StringUtils.isEmpty(endTime)) {
- endTime = LocalDate.now().toString("yyyy-MM-dd"); //2017-06-01 默认今天
- }
- this.startTime = map.getString("startTime");
- if (StringUtils.isEmpty(startTime)) {
- startTime = Contant.main_dimension_timeLevel.getStartTime(timeLevel);//默认是昨天
- }
- this.timeLevel = (String) map.get("timeLevel");
- if (StringUtils.isEmpty(this.timeLevel)) {
- this.timeLevel = Contant.main_dimension_timeLevel.day;
- }
- Object object = map.get("quota");
- if (object != null) {
- BeanUtils.copyProperties(object, this.quotaVo);
- }
- }
- @Transactional
- private void saveLog(TjQuotaLog tjQuotaLog) {
- tjQuotaLogDao.save(tjQuotaLog);
- }
- /**
- * 保存数据
- *
- * @param dataModels
- */
- private Boolean saveDate(List<SaveModel> dataModels) {
- try {
- return SpringUtil.getBean(SaveHelper.class).save(dataModels, quotaVo);
- } catch (Exception e) {
- logger.error("save error:" + e.getMessage());
- }
- return false;
- }
- }
|