SolrExtract.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package com.yihu.quota.etl.extract.solr;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
  4. import com.yihu.ehr.query.common.model.SolrGroupEntity;
  5. import com.yihu.ehr.query.services.SolrQuery;
  6. import com.yihu.ehr.util.datetime.DateUtil;
  7. import com.yihu.quota.dao.save.TjQuotaDataSaveDao;
  8. import com.yihu.quota.etl.Contant;
  9. import com.yihu.quota.etl.ExtractConverUtil;
  10. import com.yihu.quota.etl.extract.ExtractUtil;
  11. import com.yihu.quota.etl.model.EsConfig;
  12. import com.yihu.quota.model.dimension.TjQuotaDimensionMain;
  13. import com.yihu.quota.model.dimension.TjQuotaDimensionSlave;
  14. import com.yihu.quota.model.save.TjQuotaDataSave;
  15. import com.yihu.quota.vo.FilterModel;
  16. import com.yihu.quota.vo.QuotaVo;
  17. import com.yihu.quota.vo.SaveModel;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import org.springframework.context.annotation.Scope;
  22. import org.springframework.stereotype.Component;
  23. import org.springframework.util.StringUtils;
  24. import java.io.IOException;
  25. import java.util.*;
  26. /**
  27. * 对solr抽取数据,基于指标配置维度(不能包括按周、月、年等时间维度) +
  28. * 默认按天维度统计值作为最小单位分组聚合,
  29. * 保存聚合结果到ES。二次统计基于以上的聚合结果进行统计。
  30. * <p>
  31. * Created by janseny on 2017/7/10.
  32. */
  33. @Deprecated
  34. @Component
  35. @Scope("prototype")
  36. public class SolrExtract {
  37. private Logger logger = LoggerFactory.getLogger(SolrExtract.class);
  38. @Autowired
  39. private ExtractUtil extractUtil;
  40. @Autowired
  41. private SolrQuery solrQuery;
  42. @Autowired
  43. private ExtractConverUtil extractConverUtil;
  44. @Autowired
  45. private ElasticSearchUtil esUtil;
  46. @Autowired
  47. private TjQuotaDataSaveDao tjQuotaDataSaveDao;
  48. @Autowired
  49. private ObjectMapper objectMapper;
  50. private QuotaVo quotaVo;
  51. private String startTime;
  52. private String endTime;
  53. private String timeLevel;
  54. private EsConfig esConfig;
  55. public List<SaveModel> extract(List<TjQuotaDimensionMain> qdm,//主维度
  56. List<TjQuotaDimensionSlave> qds,//细维度
  57. String startTime,//开始时间
  58. String endTime, //结束时间
  59. String timeLevel, // 时间维度,默认且只按天统计
  60. QuotaVo quotaVo,//指标配置
  61. EsConfig esConfig //es配置
  62. ) throws Exception {
  63. this.startTime = startTime;
  64. this.endTime = endTime;
  65. this.timeLevel = timeLevel;
  66. this.quotaVo = quotaVo;
  67. this.esConfig = esConfig;
  68. solrQuery.initParams(this.startTime, this.endTime);
  69. // 统计数据
  70. return statiscSlor(qdm, qds, quotaVo);
  71. }
  72. public int getExtractTotal(String startTime, String endTime, EsConfig esConfig) throws Exception {
  73. this.startTime = startTime;
  74. this.endTime = endTime;
  75. this.esConfig = esConfig;
  76. solrQuery.initParams(this.startTime, this.endTime);
  77. String core = esConfig.getTable(); // solr的core名
  78. String q = null; // 过滤条件
  79. // 统计数据数量
  80. String timeKey = esConfig.getTimekey();
  81. if (!StringUtils.isEmpty(timeKey)) {
  82. if (!StringUtils.isEmpty(startTime) && !StringUtils.isEmpty(endTime)) {
  83. q = String.format("%s:[%s TO %s]", timeKey, startTime, endTime);
  84. } else {
  85. q = timeKey + ":[* TO *]";
  86. }
  87. }
  88. long rows = solrQuery.count(core, q, esConfig.getFilter());
  89. return Integer.valueOf(String.valueOf(rows));
  90. }
  91. public List<SaveModel> statiscSlor(List<TjQuotaDimensionMain> qdm,
  92. List<TjQuotaDimensionSlave> qds,
  93. QuotaVo quotaVo) throws Exception {
  94. List<SaveModel> returnList = new ArrayList<>();
  95. String core = esConfig.getTable(); // solr的core名
  96. String q = null; // 查询条件
  97. String fq = null; // 过滤条件
  98. if (esConfig.getFilter() != null) {
  99. fq = esConfig.getFilter();
  100. }
  101. String fl = ""; // 结果指定查询字段
  102. List<SolrGroupEntity> dimensionGroupList = new ArrayList<>(); // 维度分组统计条件
  103. if (StringUtils.isEmpty(esConfig.getTimekey())) {
  104. throw new Exception("数据源配置 timeKey 不能为空!");
  105. }
  106. String timeKey = esConfig.getTimekey();
  107. Map<String, String> mainMap = new HashMap<>();
  108. Map<String, String> slaveMap = new HashMap<>();
  109. for (int i = 0; i < qdm.size(); i++) {
  110. String key = qdm.get(i).getKeyVal();
  111. mainMap.put(key, key);
  112. dimensionGroupList.add(new SolrGroupEntity(key, SolrGroupEntity.GroupType.FIELD_VALUE));
  113. fl += key + ",";
  114. if (qdm.get(i).getMainCode().equals("org")) {
  115. String orgFilter = " AND org_code:*";
  116. if (!StringUtils.isEmpty(fq)) {
  117. fq += orgFilter;
  118. } else {
  119. fq = orgFilter;
  120. }
  121. }
  122. if (qdm.get(i).getMainCode().equals("town")) {
  123. String townFilter = " AND org_area:*";
  124. if (!StringUtils.isEmpty(fq)) {
  125. fq += townFilter;
  126. } else {
  127. fq = townFilter;
  128. }
  129. }
  130. }
  131. for (int i = 0; i < qds.size(); i++) {
  132. String key = qds.get(i).getKeyVal();
  133. slaveMap.put(key, key);
  134. dimensionGroupList.add(new SolrGroupEntity(key, SolrGroupEntity.GroupType.FIELD_VALUE));
  135. fl += key + ",";
  136. }
  137. fl += timeKey + ",rowkey";
  138. if (StringUtils.isEmpty(esConfig.getAggregation()) || (!esConfig.getAggregation().equals(Contant.quota.aggregation_list) && !esConfig.getAggregation().equals(Contant.quota.aggregation_distinct))) {
  139. // 默认追加一个日期字段作为细维度,方便按天统计作为最小单位统计值。
  140. slaveMap.put(timeKey, timeKey);
  141. TjQuotaDimensionSlave daySlave = new TjQuotaDimensionSlave();
  142. daySlave.setSlaveCode(timeKey);
  143. daySlave.setKeyVal(timeKey);
  144. qds.add(daySlave);
  145. dimensionGroupList.add(new SolrGroupEntity(timeKey, SolrGroupEntity.GroupType.DATE_RANGE, "+1DAY"));
  146. }
  147. // 拼接增量或全量的筛选条件
  148. if (!StringUtils.isEmpty(timeKey)) {
  149. if (!StringUtils.isEmpty(startTime) && !StringUtils.isEmpty(endTime)) {
  150. q = String.format("%s:[%s TO %s]", timeKey, startTime, endTime);
  151. } else {
  152. q = timeKey + ":[* TO *]";
  153. }
  154. }
  155. boolean listFlag = false;
  156. // 最后一个维度基于其他维度组合作为条件的统计结果的集合
  157. List<Map<String, Object>> list = new ArrayList<>();
  158. if (StringUtils.isEmpty(esConfig.getAggregation())
  159. || Contant.quota.aggregation_count.equals(esConfig.getAggregation())) {
  160. // count 聚合
  161. list = solrQuery.getCountMultList(core, q, fq, dimensionGroupList, null);
  162. } else if (!StringUtils.isEmpty(esConfig.getAggregationKey()) && Contant.quota.aggregation_sum.equals(esConfig.getAggregation())) {
  163. // sum 聚合
  164. list = solrQuery.getSumMultList(core, q, fq, esConfig.getAggregationKey(), dimensionGroupList, null);
  165. } else if (Contant.quota.aggregation_list.equals(esConfig.getAggregation())) {
  166. listFlag = true;
  167. // 查询列表
  168. try {
  169. if (esConfig.getAggregation().equals(Contant.quota.aggregation_list) && !StringUtils.isEmpty(esConfig.getAggregationKey())) {
  170. fl = fl + "," + esConfig.getAggregationKey();
  171. }
  172. logger.warn("solr 从" + quotaVo.getStart() + " 开始获取数据,这次准备获取" + quotaVo.getRows() + "条");
  173. list = solrQuery.queryReturnFieldList(core, q, fq, null, quotaVo.getStart(), quotaVo.getRows(), fl.split(","));
  174. } catch (Exception e) {
  175. throw new Exception("solr 查询异常 " + e.getMessage());
  176. }
  177. } else if (Contant.quota.aggregation_distinct.equals(esConfig.getAggregation())) {
  178. listFlag = true;
  179. // 去重查询
  180. fl += "," + esConfig.getDistinctGroupField();
  181. boolean groupNullIsolate = esConfig.getDistinctGroupNullIsolate() != null ? esConfig.getDistinctGroupNullIsolate() : false;
  182. int count = (int) solrQuery.count(core, q, fq);
  183. list = solrQuery.distinctQueryReturnFieldList(core, q, fq, null, 0, count, fl.split(","),
  184. esConfig.getDistinctGroupField(), esConfig.getDistinctGroupSort(), groupNullIsolate);
  185. // 对比ES中如果已存在该条数据则更新,并从集合中移除该条数据。
  186. checkEsDistinctData(list);
  187. }
  188. //数据转换
  189. FilterModel filterModel = new FilterModel(list, null);
  190. filterModel = extractConverUtil.convert(filterModel, qds);
  191. if (filterModel != null && filterModel.getDataList() != null) {
  192. list = filterModel.getDataList();
  193. }
  194. Map<String, String> statisticsResultMap = new LinkedHashMap<>(); // 统计结果集
  195. Map<String, String> daySlaveDictMap = new LinkedHashMap<>(); // 按天统计的所有日期项
  196. if (listFlag) {
  197. if (list != null && list.size() > 0) {
  198. returnList = extractUtil.computeList(qdm, qds, list, timeKey, esConfig.getAggregationKey(), quotaVo);
  199. }
  200. } else {
  201. if (list != null && list.size() > 0) {
  202. for (Map<String, Object> objectMap : list) {
  203. String statisticsKey = objectMap.get("$statisticsKey").toString();
  204. String result = objectMap.get("$result").toString();
  205. String quotaDate = objectMap.get(timeKey).toString();
  206. statisticsResultMap.put(statisticsKey, result);
  207. daySlaveDictMap.put(statisticsKey, quotaDate);
  208. }
  209. }
  210. // 融合主细维度、其组合统计值为SaveModel
  211. extractUtil.compute(qdm, qds, returnList, statisticsResultMap, daySlaveDictMap, quotaVo);
  212. }
  213. return returnList;
  214. }
  215. /**
  216. * 去重查询后,对比 ES 中如果已存在该条数据,且去重集合中有更前的则更新quotaDate,并从去重集合中移除该条数据。
  217. *
  218. * @param distinctQueryList 去重查询结果集
  219. */
  220. private void checkEsDistinctData(List<Map<String, Object>> distinctQueryList) throws IOException {
  221. List<Map<String, Object>> updateList = new ArrayList<>();
  222. String esIndex = null;
  223. String esType = null;
  224. if (distinctQueryList.size() > 0) {
  225. TjQuotaDataSave quotaDataSave = tjQuotaDataSaveDao.findByQuotaCode(quotaVo.getCode());
  226. Map<String, Object> configJson = objectMapper.readValue(quotaDataSave.getConfigJson(), Map.class);
  227. esIndex = configJson.get("index").toString();
  228. esType = configJson.get("type").toString();
  229. }
  230. Iterator<Map<String, Object>> iterator = distinctQueryList.iterator();
  231. while (iterator.hasNext()) {
  232. Map<String, Object> item = iterator.next();
  233. String timeKeyValue = DateUtil.formatDate((Date) item.get(esConfig.getTimekey()), DateUtil.DEFAULT_DATE_YMD_FORMAT);
  234. String distinctField = item.get("distinctField").toString();
  235. String distinctFieldValue = item.get("distinctFieldValue").toString();
  236. String filters = "quotaCode=" + quotaVo.getCode().replace("_", "") + ";distinctField=" + distinctField + ";distinctFieldValue=" + distinctFieldValue;
  237. List<Map<String, Object>> existedList = esUtil.list(esIndex, esType, filters);
  238. if (existedList.size() > 0) {
  239. Map<String, Object> quotaData = existedList.get(0);
  240. String quotaDate = quotaData.get("quotaDate").toString();
  241. if (DateUtil.compareDate(DateUtil.DEFAULT_DATE_YMD_FORMAT, timeKeyValue, quotaDate) < 0) {
  242. quotaData.put("quotaDate", timeKeyValue);
  243. updateList.add(quotaData);
  244. iterator.remove();
  245. }
  246. }
  247. }
  248. if (updateList.size() != 0) {
  249. esUtil.bulkUpdate(esIndex, esType, updateList);
  250. }
  251. }
  252. }