EsQuotaJob.java 14 KB


  1. package com.yihu.quota.job;
  2. import com.yihu.ehr.elasticsearch.ElasticSearchPool;
  3. import com.yihu.quota.dao.TjQuotaLogDao;
  4. import com.yihu.quota.etl.Contant;
  5. import com.yihu.quota.etl.extract.ExtractHelper;
  6. import com.yihu.quota.etl.extract.solr.SolrExtract;
  7. import com.yihu.quota.etl.model.EsConfig;
  8. import com.yihu.quota.etl.save.SaveHelper;
  9. import com.yihu.quota.util.ElasticsearchUtil;
  10. import com.yihu.quota.model.jpa.TjQuotaLog;
  11. import com.yihu.quota.model.jpa.source.TjQuotaDataSource;
  12. import com.yihu.quota.service.source.TjDataSourceService;
  13. import com.yihu.quota.util.SpringUtil;
  14. import com.yihu.quota.vo.QuotaVo;
  15. import com.yihu.quota.vo.SaveModel;
  16. import net.sf.json.JSONObject;
  17. import org.elasticsearch.client.Client;
  18. import org.elasticsearch.index.query.BoolQueryBuilder;
  19. import org.elasticsearch.index.query.QueryBuilders;
  20. import org.elasticsearch.index.query.QueryStringQueryBuilder;
  21. import org.elasticsearch.index.query.RangeQueryBuilder;
  22. import org.joda.time.LocalDate;
  23. import org.quartz.*;
  24. import org.slf4j.Logger;
  25. import org.slf4j.LoggerFactory;
  26. import org.springframework.beans.BeanUtils;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.context.annotation.Scope;
  29. import org.springframework.jdbc.core.JdbcTemplate;
  30. import org.springframework.stereotype.Component;
  31. import org.springframework.transaction.annotation.Transactional;
  32. import org.springframework.util.StringUtils;
  33. import org.springframework.web.context.support.SpringBeanAutowiringSupport;
  34. import java.util.Date;
  35. import java.util.List;
  36. import java.util.Map;
  37. /**
  38. * Created by chenweida on 2017/6/6.
  39. */
  40. @Component
  41. @Scope("prototype")
  42. @DisallowConcurrentExecution//防止到了执行时间点前一任务还在执行中,但是这时有空闲的线程,那么马上又会执行,这样一来就会存在同一job被并行执行
  43. public class EsQuotaJob implements Job {
  44. @Autowired
  45. ElasticsearchUtil elasticsearchUtil;
  46. private Logger logger = LoggerFactory.getLogger(EsQuotaJob.class);
  47. private String saasid; // saasid
  48. private QuotaVo quotaVo = new QuotaVo(); // 指标对象
  49. private String endTime; // 结束时间
  50. private String startTime; //开始时间
  51. private String timeLevel; //时间
  52. private String executeFlag; // 执行动作 1 手动执行 2 周期执行
  53. private int haveThreadCount = 0;//已完成线程数
  54. private int threadCount = 1;//总线程数
  55. @Autowired
  56. private TjQuotaLogDao tjQuotaLogDao;
  57. @Autowired
  58. private ExtractHelper extractHelper;
  59. @Autowired
  60. private ElasticSearchPool elasticSearchPool;
  61. @Autowired
  62. private JdbcTemplate jdbcTemplate;
  63. @Autowired
  64. private SolrExtract solrExtract;
  65. @Autowired
  66. private TjDataSourceService dataSourceService;
  67. @Override
  68. public void execute(JobExecutionContext context) throws JobExecutionException {
  69. TjQuotaLog tjQuotaLog = new TjQuotaLog();
  70. String time = "";
  71. try {
  72. //springz注入
  73. SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
  74. //初始化参数
  75. initParams(context);
  76. quotaVo.setExecuteFlag(executeFlag);
  77. logger.warn("开始执行指标" + quotaVo.getCode());
  78. tjQuotaLog.setQuotaCode(quotaVo.getCode());
  79. tjQuotaLog.setSaasId(saasid);
  80. tjQuotaLog.setStartTime(new Date());
  81. tjQuotaLog.setStatus(Contant.save_status.executing); //指标执行中
  82. tjQuotaLog.setContent("时间:" + startTime + "到" + endTime + " , " + "任务执行中。");
  83. tjQuotaLog = saveLog(tjQuotaLog);
  84. time = "时间:" + startTime + "到" + endTime + " , ";
  85. TjQuotaDataSource quotaDataSource = dataSourceService.findSourceByQuotaCode(quotaVo.getCode());
  86. if (quotaDataSource == null) {
  87. throw new Exception("数据源配置错误");
  88. }
  89. JSONObject obj = new JSONObject().fromObject(quotaDataSource.getConfigJson());
  90. EsConfig esConfig = (EsConfig) JSONObject.toBean(obj, EsConfig.class);
  91. //查询是否已经统计过,如果已统计 先删除后保存
  92. deleteRecord(quotaVo);
  93. if (quotaDataSource.getSourceCode().equals("2") && esConfig.getAggregation() != null && esConfig.getAggregation().equals("list")) {//来源solr
  94. moreThredQuota(tjQuotaLog, esConfig);
  95. } else {
  96. //统计并保存
  97. quota(tjQuotaLog, quotaVo);
  98. }
  99. } catch (Exception e) {
  100. //如果出錯立即重新執行
  101. tjQuotaLog.setStatus(Contant.save_status.fail);
  102. tjQuotaLog.setEndTime(new Date());
  103. tjQuotaLog.setContent(time + "统计异常," + e.getMessage());
  104. saveLog(tjQuotaLog);
  105. JobExecutionException e2 = new JobExecutionException(e);
  106. e2.setRefireImmediately(true);
  107. e.printStackTrace();
  108. }
  109. }
  110. /*
  111. * solr list 方式 多线程执行指标
  112. */
  113. public void moreThredQuota(TjQuotaLog tjQuotaLog, EsConfig esConfig) {
  114. try {
  115. int perCount = Contant.compute.perCount;
  116. quotaVo.setStart(0);
  117. quotaVo.setRows(perCount);
  118. int rows = solrExtract.getExtractTotal(startTime, endTime, esConfig);
  119. if (rows > perCount * 50) {
  120. throw new Exception("数据量过大请缩小抽取时间范围");
  121. }
  122. if (rows > perCount) {
  123. int count = rows / perCount;
  124. int remainder = rows % perCount;
  125. if (remainder != 0) {
  126. count++;
  127. } else {
  128. remainder = perCount;
  129. }
  130. threadCount = count;
  131. for (int i = 0; i < count; i++) {
  132. //防止过快执行导致参数被覆盖
  133. Thread.sleep(1000);
  134. final int f = i;//传值用。
  135. final TjQuotaLog quotaLogf = tjQuotaLog;//传值用。
  136. final QuotaVo quotaVof = quotaVo;//传值用。
  137. if (f != 0) {
  138. quotaVof.setStart(f * perCount);
  139. } else {
  140. quotaVof.setStart(0);
  141. }
  142. if (i + 1 == count) {
  143. quotaVof.setRows(remainder);
  144. } else {
  145. quotaVof.setRows(perCount);
  146. }
  147. Thread th = new Thread(new Thread() {
  148. public synchronized void run() {
  149. logger.warn("启动第 " + (f + 1) + " 个线程。 ");//只能访问外部的final变量。
  150. quota(quotaLogf, quotaVof);
  151. }
  152. });
  153. Thread.sleep(10000);//延迟10 秒 Es 保存2万条 有时超时,延迟执行减缓个线程同时执行的压力
  154. th.start();
  155. }
  156. } else {
  157. //统计并保存
  158. quota(tjQuotaLog, quotaVo);
  159. }
  160. } catch (Exception e) {
  161. e.printStackTrace();
  162. }
  163. }
  164. /**
  165. * 统计过程
  166. */
  167. public void quota(TjQuotaLog tjQuotaLog, QuotaVo quotaVo) {
  168. String time = "时间:" + startTime + "到" + endTime + " , ";
  169. String status = "";
  170. String content = "";
  171. try {
  172. //抽取数据
  173. List<SaveModel> dataModels = extract(quotaVo);
  174. if (dataModels != null && dataModels.size() > 0) {
  175. //保存数据
  176. Boolean success = saveData(dataModels, quotaVo);
  177. status = success ? Contant.save_status.success : Contant.save_status.fail;
  178. content = success ? time + "统计保存成功" : time + "统计保存失败";
  179. logger.info(content + dataModels.size());
  180. haveThreadCount++;
  181. } else {
  182. status = Contant.save_status.fail;
  183. content = time + "没有抽取到数据";
  184. haveThreadCount++;
  185. }
  186. // 初始执行时,更新该指标为已初始执行过
  187. if (quotaVo.getExecuteFlag().equals("1")) {
  188. String sql = "UPDATE tj_quota SET is_init_exec = '1' WHERE id = " + quotaVo.getId();
  189. jdbcTemplate.update(sql);
  190. }
  191. } catch (Exception e) {
  192. haveThreadCount++;
  193. tjQuotaLog.setStatus(Contant.save_status.fail);
  194. tjQuotaLog.setContent(e.getMessage());
  195. tjQuotaLog = saveLog(tjQuotaLog);
  196. e.printStackTrace();
  197. }
  198. if (threadCount > 1) {
  199. if (haveThreadCount == threadCount) {
  200. tjQuotaLog.setStatus(Contant.save_status.success);
  201. tjQuotaLog.setContent(time + "统计保存成功");
  202. logger.warn("指标" + tjQuotaLog.getQuotaCode() + "统计成功 结束!");
  203. } else {
  204. tjQuotaLog.setStatus(Contant.save_status.fail);
  205. tjQuotaLog.setContent(time + "统计保存失败");
  206. }
  207. tjQuotaLog.setEndTime(new Date());
  208. saveLog(tjQuotaLog);
  209. } else {
  210. tjQuotaLog.setStatus(status);
  211. tjQuotaLog.setContent(content);
  212. tjQuotaLog.setEndTime(new Date());
  213. saveLog(tjQuotaLog);
  214. logger.warn("结束!" + content);
  215. }
  216. }
  217. private void deleteRecord(QuotaVo quotaVo) throws Exception {
  218. EsConfig esConfig = extractHelper.getEsConfig(quotaVo.getCode());
  219. EsConfig sourceEsConfig = extractHelper.getDataSourceEsConfig(quotaVo.getCode());
  220. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  221. QueryStringQueryBuilder termQueryQuotaCode = QueryBuilders.queryStringQuery("quotaCode:" + quotaVo.getCode().replaceAll("_", ""));
  222. boolQueryBuilder.must(termQueryQuotaCode);
  223. String start = "";
  224. String end = "";
  225. if (sourceEsConfig.getFullQuery() != null && sourceEsConfig.getFullQuery().equals("true")) {
  226. start = LocalDate.now().toString();
  227. end = start;
  228. } else {
  229. if (!StringUtils.isEmpty(startTime)) {
  230. start = startTime;
  231. }
  232. if (!StringUtils.isEmpty(endTime)) {
  233. end = endTime;
  234. }
  235. }
  236. RangeQueryBuilder rangeQueryStartTime = QueryBuilders.rangeQuery("quotaDate").gte(start.substring(0, 10));
  237. boolQueryBuilder.must(rangeQueryStartTime);
  238. RangeQueryBuilder rangeQueryEndTime = QueryBuilders.rangeQuery("quotaDate").lte(end.substring(0, 10));
  239. boolQueryBuilder.must(rangeQueryEndTime);
  240. boolean flag = true;
  241. Client talClient = elasticSearchPool.getClient();
  242. Client client = elasticSearchPool.getClient();
  243. try {
  244. while (flag) {
  245. long count = elasticsearchUtil.getTotalCount(talClient, esConfig.getIndex(), esConfig.getType(), boolQueryBuilder);
  246. if (count != 0) {
  247. boolean successFlag = elasticsearchUtil.queryDelete(client, esConfig.getIndex(), esConfig.getType(), boolQueryBuilder);
  248. if (!successFlag) {
  249. throw new Exception("Elasticsearch 指标统计时原始数据删除失败");
  250. }
  251. } else {
  252. flag = false;
  253. }
  254. }
  255. } catch (Exception e) {
  256. e.printStackTrace();
  257. throw new Exception("Elasticsearch 指标统计时原始数据删除异常");
  258. } finally {
  259. talClient.close();
  260. client.close();
  261. logger.debug(quotaVo.getCode() + " delete success");
  262. }
  263. }
  264. /**
  265. * 抽取数据
  266. *
  267. * @return
  268. */
  269. private List<SaveModel> extract(QuotaVo quotaVo) throws Exception {
  270. return SpringUtil.getBean(ExtractHelper.class).extractData(quotaVo, startTime, endTime, timeLevel, saasid);
  271. }
  272. /**
  273. * 初始化参数
  274. *
  275. * @param context
  276. */
  277. private void initParams(JobExecutionContext context) {
  278. JobDataMap map = context.getJobDetail().getJobDataMap();
  279. Map<String, Object> params = context.getJobDetail().getJobDataMap();
  280. Object object = map.get("quota");
  281. if (object != null) {
  282. BeanUtils.copyProperties(object, this.quotaVo);
  283. }
  284. this.saasid = map.getString("saasid");
  285. // 默认按天,如果指标有配置时间维度,ES抽取过程中维度字典项转换为 SaveModel 时再覆盖。
  286. this.timeLevel = Contant.main_dimension_timeLevel.day;
  287. this.executeFlag = map.getString("executeFlag");
  288. if ("2".equals(executeFlag)) {
  289. if (StringUtils.isEmpty(map.getString("startTime"))) {
  290. startTime = Contant.main_dimension_timeLevel.getStartTime(timeLevel);
  291. } else {
  292. this.startTime = map.getString("startTime").split("T")[0] + "T00:00:00Z";
  293. }
  294. if (StringUtils.isEmpty(map.getString("endTime"))) {
  295. endTime = LocalDate.now().toString("yyyy-MM-dd'T'00:00:00'Z'");
  296. } else {
  297. this.endTime = map.getString("endTime").split("T")[0] + "T23:59:59Z";
  298. }
  299. }
  300. }
  301. @Transactional
  302. private TjQuotaLog saveLog(TjQuotaLog tjQuotaLog) {
  303. TjQuotaLog log = tjQuotaLogDao.save(tjQuotaLog);
  304. return log;
  305. }
  306. /**
  307. * 保存数据
  308. *
  309. * @param dataModels
  310. */
  311. private Boolean saveData(List<SaveModel> dataModels, QuotaVo quotaVo) {
  312. try {
  313. return SpringUtil.getBean(SaveHelper.class).save(dataModels, quotaVo);
  314. } catch (Exception e) {
  315. logger.error("save error:" + e.getMessage());
  316. }
  317. return false;
  318. }
  319. }