EsQuotaPercentJob.java 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package com.yihu.quota.job;
  2. import com.yihu.quota.dao.TjQuotaLogDao;
  3. import com.yihu.quota.etl.Contant;
  4. import com.yihu.quota.etl.extract.ExtractHelper;
  5. import com.yihu.quota.etl.extract.ExtractPercentHelper;
  6. import com.yihu.quota.etl.model.EsConfig;
  7. import com.yihu.quota.etl.save.SaveHelper;
  8. import com.yihu.quota.model.TjQuotaLog;
  9. import com.yihu.quota.util.ElasticsearchUtil;
  10. import com.yihu.quota.util.SpringUtil;
  11. import com.yihu.quota.vo.QuotaVo;
  12. import com.yihu.quota.vo.SaveModel;
  13. import org.elasticsearch.client.Client;
  14. import org.elasticsearch.index.query.BoolQueryBuilder;
  15. import org.elasticsearch.index.query.QueryBuilders;
  16. import org.elasticsearch.index.query.QueryStringQueryBuilder;
  17. import org.elasticsearch.index.query.RangeQueryBuilder;
  18. import org.joda.time.LocalDate;
  19. import org.quartz.*;
  20. import org.slf4j.Logger;
  21. import org.slf4j.LoggerFactory;
  22. import org.springframework.beans.BeanUtils;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.context.annotation.Scope;
  25. import org.springframework.stereotype.Component;
  26. import org.springframework.transaction.annotation.Transactional;
  27. import org.springframework.util.StringUtils;
  28. import org.springframework.web.context.support.SpringBeanAutowiringSupport;
  29. import java.util.ArrayList;
  30. import java.util.Date;
  31. import java.util.List;
  32. /**
  33. * @author janseny
  34. * @date 2017/8/22
  35. * DisallowConcurrentExecution 防止到了执行时间点前一任务还在执行中,但是这时有空闲的线程,那么马上又会执行,这样一来就会存在同一job被并行执行
  36. */
  37. @Component
  38. @Scope("prototype")
  39. @DisallowConcurrentExecution
  40. public class EsQuotaPercentJob implements Job {
  41. @Autowired
  42. ElasticsearchUtil elasticsearchUtil;
  43. private Logger logger = LoggerFactory.getLogger(EsQuotaPercentJob.class);
  44. private QuotaVo quotaVo = new QuotaVo();//指标对象
  45. private String endTime;//结束时间
  46. private String startTime;//开始时间
  47. private String timeLevel;//时间
  48. private String molecular;//分子
  49. private String denominator;//分母
  50. @Autowired
  51. private TjQuotaLogDao tjQuotaLogDao;
  52. @Autowired
  53. private ElasticsearchUtil esClientUtil;
  54. @Autowired
  55. private ExtractHelper extractHelper;
  56. @Override
  57. public void execute(JobExecutionContext context) throws JobExecutionException {
  58. try {
  59. //springz注入
  60. SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
  61. //初始化参数
  62. initParams(context);
  63. //统计
  64. quota();
  65. } catch (Exception e) {
  66. //如果出錯立即重新執行
  67. JobExecutionException e2 = new JobExecutionException(e);
  68. e2.setRefireImmediately(true);
  69. e.printStackTrace();
  70. }
  71. }
  72. /**
  73. * 统计过程
  74. */
  75. private void quota() {
  76. TjQuotaLog tjQuotaLog = new TjQuotaLog();
  77. tjQuotaLog.setQuotaCode(quotaVo.getCode());
  78. tjQuotaLog.setStartTime(new Date());
  79. String message = "";
  80. try {
  81. //抽取数据计算数据
  82. List<SaveModel> dataModels = extract();
  83. if (dataModels != null && dataModels.size() > 0) {
  84. //查询是否已经统计过,如果已统计 先删除后保存
  85. EsConfig esConfig = extractHelper.getEsConfig(quotaVo.getCode());
  86. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  87. QueryStringQueryBuilder termQueryQuotaCode = QueryBuilders.queryStringQuery("quotaCode:" + quotaVo.getCode().replaceAll("_", ""));
  88. boolQueryBuilder.must(termQueryQuotaCode);
  89. if (!StringUtils.isEmpty(startTime)) {
  90. RangeQueryBuilder rangeQueryStartTime = QueryBuilders.rangeQuery("quotaDate").gte(startTime);
  91. boolQueryBuilder.must(rangeQueryStartTime);
  92. }
  93. if (!StringUtils.isEmpty(endTime)) {
  94. RangeQueryBuilder rangeQueryEndTime = QueryBuilders.rangeQuery("quotaDate").lte(endTime);
  95. boolQueryBuilder.must(rangeQueryEndTime);
  96. }
  97. Client client = esClientUtil.getClient(esConfig.getHost(), esConfig.getPort(), esConfig.getClusterName());
  98. try {
  99. elasticsearchUtil.queryDelete(client, esConfig.getIndex(), esConfig.getType(), boolQueryBuilder);
  100. } catch (Exception e) {
  101. e.getMessage();
  102. } finally {
  103. client.close();
  104. }
  105. List<SaveModel> dataSaveModels = new ArrayList<>();
  106. for (SaveModel saveModel : dataModels) {
  107. if (saveModel.getResult() != null) {
  108. dataSaveModels.add(saveModel);
  109. }
  110. }
  111. //保存数据
  112. Boolean success = saveDate(dataSaveModels);
  113. tjQuotaLog.setStatus(success ? Contant.save_status.success : Contant.save_status.fail);
  114. tjQuotaLog.setContent(success ? "统计保存成功" : "统计数据ElasticSearch保存失败");
  115. } else {
  116. tjQuotaLog.setStatus(Contant.save_status.fail);
  117. tjQuotaLog.setContent("没有抽取到数据");
  118. }
  119. } catch (Exception e) {
  120. logger.error(e.getMessage());
  121. message = e.getMessage();
  122. tjQuotaLog.setStatus(Contant.save_status.fail);
  123. tjQuotaLog.setContent(message);
  124. }
  125. tjQuotaLog.setEndTime(new Date());
  126. saveLog(tjQuotaLog);
  127. }
  128. /**
  129. * 抽取数据
  130. *
  131. * @return
  132. */
  133. private List<SaveModel> extract() throws Exception {
  134. return SpringUtil.getBean(ExtractPercentHelper.class).extractData(quotaVo, startTime, endTime, timeLevel);
  135. }
  136. /**
  137. * 初始化参数
  138. *
  139. * @param context
  140. */
  141. private void initParams(JobExecutionContext context) {
  142. JobDataMap map = context.getJobDetail().getJobDataMap();
  143. this.molecular = map.getString("molecular");
  144. this.denominator = map.getString("denominator");
  145. this.endTime = map.getString("endTime");
  146. if (StringUtils.isEmpty(endTime)) {
  147. endTime = LocalDate.now().toString("yyyy-MM-dd"); //2017-06-01 默认今天
  148. }
  149. this.startTime = map.getString("startTime");
  150. if (StringUtils.isEmpty(startTime)) {
  151. startTime = Contant.main_dimension_timeLevel.getStartTime(timeLevel);//默认是昨天
  152. }
  153. this.timeLevel = (String) map.get("timeLevel");
  154. if (StringUtils.isEmpty(this.timeLevel)) {
  155. this.timeLevel = Contant.main_dimension_timeLevel.day;
  156. }
  157. Object object = map.get("quota");
  158. if (object != null) {
  159. BeanUtils.copyProperties(object, this.quotaVo);
  160. }
  161. }
  162. @Transactional
  163. private void saveLog(TjQuotaLog tjQuotaLog) {
  164. tjQuotaLogDao.save(tjQuotaLog);
  165. }
  166. /**
  167. * 保存数据
  168. *
  169. * @param dataModels
  170. */
  171. private Boolean saveDate(List<SaveModel> dataModels) {
  172. try {
  173. return SpringUtil.getBean(SaveHelper.class).save(dataModels, quotaVo);
  174. } catch (Exception e) {
  175. logger.error("save error:" + e.getMessage());
  176. }
  177. return false;
  178. }
  179. }