AggregationBuildHandler.java 21 KB


  1. package com.yihu.quota.util;
  2. import com.github.abel533.echarts.Option;
  3. import com.yihu.quota.dao.view.ViewDao;
  4. import com.yihu.quota.dao.view.ViewDimensionDao;
  5. import com.yihu.quota.dao.view.ViewQuotaDao;
  6. import com.yihu.quota.dao.view.ViewQuotaFilterDao;
  7. import com.yihu.quota.model.view.View;
  8. import com.yihu.quota.model.view.ViewDimension;
  9. import com.yihu.quota.model.view.ViewQuota;
  10. import com.yihu.quota.model.view.ViewQuotaFilter;
  11. import com.yihu.quota.service.cube.CubeService;
  12. import org.elasticsearch.action.search.SearchRequestBuilder;
  13. import org.elasticsearch.action.search.SearchResponse;
  14. import org.elasticsearch.client.transport.TransportClient;
  15. import org.elasticsearch.index.query.BoolQueryBuilder;
  16. import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
  17. import org.elasticsearch.search.aggregations.Aggregation;
  18. import org.elasticsearch.search.aggregations.AggregationBuilders;
  19. import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
  20. import org.elasticsearch.search.aggregations.bucket.terms.*;
  21. import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
  22. import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
  23. import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
  24. import org.elasticsearch.search.aggregations.metrics.min.InternalMin;
  25. import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
  26. import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
  27. import org.slf4j.Logger;
  28. import org.slf4j.LoggerFactory;
  29. import org.springframework.beans.factory.annotation.Autowired;
  30. import org.springframework.stereotype.Component;
  31. import org.springframework.util.StringUtils;
  32. import java.util.*;
  33. /**
  34. * Created by janseny on 2018/9/28.
  35. */
  36. @Component
  37. public class AggregationBuildHandler {
  38. Logger logger = LoggerFactory.getLogger(AggregationBuildHandler.class);
  39. @Autowired
  40. private ViewDimensionDao viewDimensionDao;
  41. @Autowired
  42. private ViewDao viewDao;
  43. @Autowired
  44. private ViewQuotaDao viewQuotaDao;
  45. @Autowired
  46. private ViewQuotaFilterDao viewQuotaFilterDao;
  47. @Autowired
  48. private CubeService cubeService;
  49. /**
  50. * 简单聚合
  51. * @param aggType 聚合类型
  52. * @param aggName 聚合名称
  53. * @param fieldName 聚合字段
  54. * @return
  55. */
  56. public AbstractAggregationBuilder addAggregationBuilder(String aggType,String aggName, String fieldName){
  57. AbstractAggregationBuilder builder = null;
  58. if(aggType.equals("sum")){
  59. builder = AggregationBuilders.sum(aggName).field(fieldName);
  60. }else if(aggType.equals("count")){
  61. builder = AggregationBuilders.count(aggName).field(fieldName);
  62. }else if(aggType.equals("avg")){
  63. builder = AggregationBuilders.avg(aggName).field(fieldName);
  64. }else if(aggType.equals("max")){
  65. builder = AggregationBuilders.max(aggName).field(fieldName);
  66. }else if(aggType.equals("min")){
  67. builder = AggregationBuilders.min(aggName).field(fieldName);
  68. }
  69. return builder;
  70. }
  71. /**
  72. * 分组后,简单聚合
  73. * @param aggType 聚合类型
  74. * @param aggName 聚合名称
  75. * @param fieldName 聚合字段
  76. * @return
  77. */
  78. public AbstractAggregationBuilder addTermAggregationBuilder(String termName, String termFieldName,String aggType,String aggName, String fieldName){
  79. AbstractAggregationBuilder builder = null;
  80. TermsBuilder termsBuilder = addTermsBuilder(termName,termFieldName);
  81. if(aggType.equals("sum")){
  82. builder = termsBuilder.subAggregation(AggregationBuilders.sum(aggName).field(fieldName));
  83. }else if(aggType.equals("count")){
  84. builder = termsBuilder.subAggregation(AggregationBuilders.count(aggName).field(fieldName));
  85. }else if(aggType.equals("avg")){
  86. builder = termsBuilder.subAggregation(AggregationBuilders.avg(aggName).field(fieldName));
  87. }else if(aggType.equals("max")){
  88. builder = termsBuilder.subAggregation(AggregationBuilders.max(aggName).field(fieldName));
  89. }else if(aggType.equals("min")){
  90. builder = termsBuilder.subAggregation(AggregationBuilders.min(aggName).field(fieldName));
  91. }
  92. return builder;
  93. }
  94. /**
  95. * 添加分组
  96. * @param termName
  97. * @param fieldName
  98. * @return
  99. */
  100. public TermsBuilder addTermsBuilder(String termName, String fieldName){
  101. TermsBuilder termBuilder = AggregationBuilders.terms(termName).field(fieldName);
  102. return termBuilder;
  103. }
  104. /**
  105. *
  106. * @param client
  107. * @param boolQueryBuilder 查询的过滤条件
  108. * @param aggBuilderList 聚合组
  109. * 聚合组中成员是 单个的聚合查询,其中单个的聚合查询可以嵌套子聚合查询 如:
  110. * 单个聚合 查询 :TermsBuilder firstAgg= AggregationBuilders.terms("player_count ").field("team");
  111. * 带有子聚合的聚合查询 :
  112. * TermsBuilder secondAgg= AggregationBuilders.terms("pos_count").field("position")
  113. * .subAggregation(
  114. * AggregationBuilders.dateHistogram("by_year").field("dateOfBirth").interval((DateHistogramInterval.YEAR))
  115. * .subAggregation(
  116. * AggregationBuilders.avg("avg_children").field("children")
  117. * )
  118. * );
  119. * @return 聚合结果集
  120. */
  121. public Map<String, Aggregation> structAggregationQuery(TransportClient client,String index,String type,
  122. BoolQueryBuilder boolQueryBuilder,
  123. LinkedList<AbstractAggregationBuilder> aggBuilderList){
  124. SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type).setQuery(boolQueryBuilder);
  125. for(AbstractAggregationBuilder aggBuilder : aggBuilderList){
  126. searchRequestBuilder.addAggregation(aggBuilder);
  127. }
  128. SearchResponse response = searchRequestBuilder.execute().actionGet();
  129. Map<String, Aggregation> map = response.getAggregations().getAsMap();
  130. client.close();
  131. return map;
  132. }
  133. /**
  134. * 表格型 - 数据结果解析
  135. * @param
  136. * @param map 聚合查询结果
  137. * @return
  138. */
  139. public List<Map<String, Object>> tableDataParsing(View view,Map<String, Aggregation> map){
  140. // List<String> rowList = new LinkedList<>();
  141. // rowList.add("town_terms");
  142. // rowList.add("total");
  143. // List<String> cloumnList = new ArrayList<>();
  144. // cloumnList.add("total_count_result");
  145. // cloumnList.add("sex_terms");
  146. List<String> rowList = getRowDimensionList(view.getId());
  147. List<String> cloumnList = getColumnDimensionList(view.getId());
  148. List<ViewQuota> viewQuotas = viewQuotaDao.findByViewId(view.getId());
  149. Map<String, Object> quotaMap = new HashMap<>();
  150. for(ViewQuota viewQuota :viewQuotas){
  151. List<ViewQuotaFilter> viewQuotaFilters = viewQuotaFilterDao.findByRelationId(viewQuota.getId());
  152. String filter = "noFilter";
  153. if(viewQuotaFilters != null && viewQuotaFilters.size() > 0){
  154. filter = "yesFilter";
  155. }
  156. quotaMap.put(viewQuota.getCode(),filter);
  157. //指标也作为列维度
  158. cloumnList.add(viewQuota.getBasicFormulaType() +"-result");
  159. }
  160. List<Map<String, Object>> resultList = new LinkedList<Map<String, Object>>();
  161. //合计行 数据处理 开始
  162. boolean total = false;
  163. Map<String, Object> totalResultMap = new HashMap<>();
  164. totalResultMap.put(rowList.get(0),"total");//默认第一个行维度
  165. for(String rowAggCode: rowList) {
  166. if (rowAggCode.contains("total")) {
  167. total = true;
  168. for (String cloumnAggCode : cloumnList) {
  169. for(String aggCode:map.keySet()) {
  170. Aggregation aggregation = map.get(aggCode);
  171. if (aggregation.getName().equals("total_" + cloumnAggCode)) {
  172. Map<String, Object> subDataMap = getInternalAggValue(cloumnAggCode, aggregation);
  173. if (subDataMap == null || subDataMap.size() == 0) {
  174. Terms subTerm = (Terms) aggregation;
  175. Map<String, Object> data = getTermAggregationData(subTerm.getBuckets().iterator(), cloumnAggCode);
  176. totalResultMap.putAll(data);
  177. } else {
  178. totalResultMap.putAll(subDataMap);
  179. }
  180. }
  181. }
  182. }
  183. }
  184. }
  185. if(total){
  186. resultList.add(totalResultMap);
  187. //合计行 数据处理 接收
  188. }
  189. //非合计数据处理
  190. for(String rowAggCode: rowList){
  191. if( !rowAggCode.contains("total")){
  192. for(String aggCode:map.keySet()){
  193. Aggregation aggregation = map.get(aggCode);
  194. String aggName = aggregation.getName();
  195. if(rowAggCode.equals(aggName)){
  196. Terms terms = (Terms)aggregation;
  197. String termNmae = terms.getName();
  198. String key = "";
  199. List<Terms.Bucket> buckets = terms.getBuckets();
  200. for(Terms.Bucket bucket : buckets){
  201. Map<String, Object> resultMap = new HashMap<>();
  202. key = bucket.getKeyAsString() != null ? bucket.getKeyAsString() : bucket.getKey().toString();
  203. resultMap.put(termNmae,key);
  204. for(String cloumnAggCode: cloumnList){
  205. List<Aggregation> aggregationList = bucket.getAggregations().asList();
  206. for(Aggregation subAgg : aggregationList){
  207. if(subAgg.getName().equals(cloumnAggCode)){
  208. Map<String, Object> subDataMap = getInternalAggValue(subAgg.getName(),subAgg);
  209. if(subDataMap == null || subDataMap.size() == 0){
  210. Terms subTerm = (Terms)subAgg;
  211. Map<String, Object> data = getTermAggregationData(subTerm.getBuckets().iterator(), cloumnAggCode);
  212. resultMap.putAll(data);
  213. }else {
  214. resultMap.putAll(subDataMap);
  215. }
  216. }
  217. }
  218. }
  219. resultList.add(resultMap);
  220. }
  221. }
  222. }
  223. }
  224. }
  225. return resultList;
  226. }
  227. /**
  228. * 数值型 - 数据结果解析
  229. * @param
  230. * @param map 聚合查询结果
  231. * @return
  232. */
  233. public Map<String, Object> numericalDataParsing(View view,Map<String, Aggregation> map){
  234. Map<String, Object> resultMap = new HashMap<>();
  235. List<ViewQuota> viewQuotas = viewQuotaDao.findByViewId(view.getId());
  236. Map<String, Object> quotaMap = new HashMap<>();
  237. for(ViewQuota viewQuota :viewQuotas){
  238. List<ViewQuotaFilter> viewQuotaFilters = viewQuotaFilterDao.findByRelationId(viewQuota.getId());
  239. String filter = "noFilter";
  240. if(viewQuotaFilters != null && viewQuotaFilters.size() > 0){
  241. filter = "yesFilter";
  242. }
  243. quotaMap.put(viewQuota.getCode(),filter);
  244. }
  245. for(String aggCode:map.keySet()) {
  246. Aggregation aggregation = map.get(aggCode);
  247. for(String quotaCode :quotaMap.keySet()){
  248. String quotaAggCode = quotaCode;
  249. if(quotaMap.get(quotaCode).equals("yesFilter")){
  250. quotaAggCode = quotaCode + "-filter" ;
  251. }
  252. if(!StringUtils.isEmpty(quotaAggCode) && aggCode.contains(quotaAggCode)){
  253. Map<String, Object> dataMap = getInternalAggValue(quotaCode, aggregation);
  254. resultMap.putAll(dataMap);
  255. }
  256. }
  257. }
  258. return resultMap;
  259. }
  260. /**
  261. * 图表型 - 数据结果解析
  262. * 一个展示维度,多个指标组成
  263. * @param
  264. * @param map 聚合查询结果
  265. * @return
  266. */
  267. public Option ehartDataParsing(View view,Map<String, Aggregation> map){
  268. Option option = null;
  269. ReportOption reportOption = new ReportOption();
  270. List<String> xDataList = new LinkedList<>();
  271. List<String> charTypes = new LinkedList<>();
  272. List<String> lineNames = new LinkedList<>();
  273. List<List<Object>> lineDataList = new LinkedList<>();
  274. List<String> rowList = new LinkedList<>();
  275. rowList.add("town_terms");
  276. // rowList = getRowDimensionList(view.getId());
  277. if(rowList.size() != 1){
  278. logger.debug("图表型指标维度配置有误");
  279. return null;
  280. }
  281. String rowAggCode = rowList.get(0);
  282. List<String> quotaList = new LinkedList<>();
  283. Map<String,List<Object>> quotasDataMap = new LinkedHashMap<>();
  284. List<ViewQuota> viewQuotas = new ArrayList<>();
  285. ViewQuota vq = new ViewQuota();
  286. vq.setChartType("1");
  287. vq.setCode("total");
  288. vq.setName("各区县总人数");
  289. viewQuotas.add(vq);
  290. // viewQuotas = viewQuotaDao.findByViewId(view.getId());
  291. for(ViewQuota viewQuota :viewQuotas){
  292. quotaList.add(viewQuota.getCode());
  293. charTypes.add(viewQuota.getChartType());
  294. lineNames.add(viewQuota.getName());
  295. }
  296. Aggregation aggregation = map.get(rowAggCode);
  297. if(rowAggCode.equals(aggregation.getName())){
  298. Terms terms = (Terms)aggregation;
  299. String key = "";
  300. List<Terms.Bucket> buckets = terms.getBuckets();
  301. for(String quotaCode: quotaList){
  302. List<Object> quotaData = new ArrayList<>();
  303. for(Terms.Bucket bucket : buckets){
  304. key = bucket.getKeyAsString() != null ? bucket.getKeyAsString() : bucket.getKey().toString();
  305. xDataList.add(key);
  306. List<Aggregation> aggregationList = bucket.getAggregations().asList();
  307. for(Aggregation subAgg : aggregationList){
  308. if(subAgg.getName().contains(quotaCode)){
  309. Map<String, Object> subDataMap = getInternalAggValue(subAgg.getName(),subAgg);
  310. if(subDataMap != null || subDataMap.size() > 0){
  311. quotaData.add(subDataMap.get(subAgg.getName()));
  312. }
  313. }
  314. }
  315. }
  316. quotasDataMap.put(quotaCode, quotaData);
  317. }
  318. }
  319. for (Map.Entry<String,List<Object>> entry : quotasDataMap.entrySet()) {
  320. lineDataList.add(entry.getValue());
  321. }
  322. option = reportOption.getLineEchartOptionMoreChart(view.getName(), "xName", "yName", xDataList.toArray(), lineDataList , lineNames, charTypes);
  323. return option;
  324. }
  325. /**
  326. * 获取列维度
  327. * @param viewId
  328. * @return
  329. */
  330. private List<String> getColumnDimensionList(String viewId){
  331. List<String> cloumnList = new ArrayList<>();
  332. List<ViewDimension> colDimensionList = viewDimensionDao.getColDimensionList(viewId);
  333. for (ViewDimension colViewDimension : colDimensionList) {
  334. cloumnList.add(colViewDimension.getDimensionCode());
  335. }
  336. return cloumnList;
  337. }
  338. /**
  339. * 获取行维度
  340. * @param viewId
  341. * @return
  342. */
  343. private List<String> getRowDimensionList(String viewId){
  344. List<String> rowList = new ArrayList<>();
  345. // 视图各组组内顶层行维度(升序)
  346. List<ViewDimension> groupTopRowDimensionList = viewDimensionDao.getGroupTopRowDimensionList(viewId);
  347. for (ViewDimension viewDimension : groupTopRowDimensionList) {
  348. String aggType = "-terms";
  349. String dataType = cubeService.findDimensionDataType("cubeCode:1", viewDimension.getDimensionCode());
  350. if(!StringUtils.isEmpty(dataType)){
  351. if(dataType.equals("date")){
  352. aggType = "-date_histogram";
  353. }
  354. }
  355. rowList.add(viewDimension.getDimensionCode() + aggType);
  356. List<ViewDimension> otherRowDimensionList = viewDimensionDao.getGroupOtherRowDimensionList(viewId, viewDimension.getGroupRow());
  357. for (ViewDimension otherViewDimension : otherRowDimensionList) {
  358. rowList.add(otherViewDimension.getDimensionCode() + aggType);;
  359. }
  360. }
  361. return rowList;
  362. }
  363. private List<Aggregation> getAggregationList(Iterator<Terms.Bucket> gradeBucket){
  364. List<Aggregation> aggregationList = new ArrayList<>();
  365. while (gradeBucket.hasNext()) {
  366. Terms.Bucket b = gradeBucket.next();
  367. aggregationList = b.getAggregations().asList();
  368. }
  369. return aggregationList;
  370. }
  371. /**
  372. * 查询未分组-统计值
  373. * @param aggregation
  374. * @return
  375. */
  376. public Map<String, Object> getInternalAggValue(String key, Aggregation aggregation){
  377. Map<String, Object> map = new HashMap<>();
  378. Object value = null;
  379. if (aggregation instanceof InternalValueCount) {
  380. InternalValueCount valueCount = (InternalValueCount) aggregation;
  381. value = valueCount.getValue();
  382. }else if (aggregation instanceof InternalSum) {
  383. InternalSum valueCount = (InternalSum) aggregation;
  384. value = valueCount.getValue();
  385. }else if (aggregation instanceof InternalMax) {
  386. InternalMax valueCount = (InternalMax) aggregation;
  387. value = valueCount.getValue();
  388. }else if (aggregation instanceof InternalMin) {
  389. InternalMin valueCount = (InternalMin) aggregation;
  390. value = valueCount.getValue();
  391. }else if (aggregation instanceof InternalAvg) {
  392. InternalAvg valueCount = (InternalAvg) aggregation;
  393. value = valueCount.getValue();
  394. }else if (aggregation instanceof InternalFilter) {
  395. InternalFilter internalFilter = (InternalFilter) aggregation;
  396. List<Aggregation> aggregationList = internalFilter.getAggregations().asList();
  397. Map<String, Object> filterValue = getInternalAggValue(key,aggregationList.get(0));
  398. return filterValue;
  399. }else if (aggregation instanceof InternalCardinality) {//去重
  400. InternalCardinality valueCount = (InternalCardinality) aggregation;
  401. value = valueCount.getValue();
  402. }
  403. if(value != null){
  404. map.put(key, value);
  405. }
  406. return map;
  407. }
  408. /**
  409. * 查询带分组-统计值
  410. * @param gradeBucket
  411. * @param aggCode
  412. * @return
  413. */
  414. private Map<String, Object> getTermAggregationData(Iterator<Terms.Bucket> gradeBucket ,String aggCode){
  415. Map<String, Object> map = new HashMap<>();
  416. while (gradeBucket.hasNext()) {
  417. Terms.Bucket b = gradeBucket.next();
  418. String subVal = "";
  419. if(b.getKeyAsString() !=null){
  420. subVal = aggCode+ "-" + b.getKeyAsString();
  421. }else {
  422. subVal = aggCode+ "-" + b.getKey();
  423. }
  424. for(Aggregation aggregation : b.getAggregations().asList()){
  425. if (aggregation instanceof InternalValueCount) {
  426. InternalValueCount valueCount = (InternalValueCount) aggregation;
  427. map.put(subVal, valueCount.getValue() );
  428. }else if (aggregation instanceof InternalSum) {
  429. InternalSum valueCount = (InternalSum) aggregation;
  430. map.put(subVal, valueCount.getValue() );
  431. }else if (aggregation instanceof InternalMax) {
  432. InternalMax valueCount = (InternalMax) aggregation;
  433. map.put(subVal, valueCount.getValue());
  434. }else if (aggregation instanceof InternalMin) {
  435. InternalMin valueCount = (InternalMin) aggregation;
  436. map.put(subVal, valueCount.getValue());
  437. }else if (aggregation instanceof InternalAvg) {
  438. InternalAvg valueCount = (InternalAvg) aggregation;
  439. map.put(subVal, valueCount.getValue());
  440. }else {
  441. Terms terms = (Terms)aggregation;
  442. getTermAggregationData(terms.getBuckets().iterator(), aggCode);
  443. }
  444. }
  445. }
  446. return map;
  447. }
  448. }