123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- package com.yihu.quota.util;
- import com.github.abel533.echarts.Option;
- import com.yihu.quota.dao.view.ViewDao;
- import com.yihu.quota.dao.view.ViewDimensionDao;
- import com.yihu.quota.dao.view.ViewQuotaDao;
- import com.yihu.quota.dao.view.ViewQuotaFilterDao;
- import com.yihu.quota.model.view.View;
- import com.yihu.quota.model.view.ViewDimension;
- import com.yihu.quota.model.view.ViewQuota;
- import com.yihu.quota.model.view.ViewQuotaFilter;
- import com.yihu.quota.service.cube.CubeService;
- import org.elasticsearch.action.search.SearchRequestBuilder;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.transport.TransportClient;
- import org.elasticsearch.index.query.BoolQueryBuilder;
- import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
- import org.elasticsearch.search.aggregations.Aggregation;
- import org.elasticsearch.search.aggregations.AggregationBuilders;
- import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
- import org.elasticsearch.search.aggregations.bucket.terms.*;
- import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
- import org.elasticsearch.search.aggregations.metrics.cardinality.InternalCardinality;
- import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
- import org.elasticsearch.search.aggregations.metrics.min.InternalMin;
- import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
- import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StringUtils;
- import java.util.*;
- /**
- * Created by janseny on 2018/9/28.
- */
- @Component
- public class AggregationBuildHandler {
- Logger logger = LoggerFactory.getLogger(AggregationBuildHandler.class);
- @Autowired
- private ViewDimensionDao viewDimensionDao;
- @Autowired
- private ViewDao viewDao;
- @Autowired
- private ViewQuotaDao viewQuotaDao;
- @Autowired
- private ViewQuotaFilterDao viewQuotaFilterDao;
- @Autowired
- private CubeService cubeService;
- /**
- * 简单聚合
- * @param aggType 聚合类型
- * @param aggName 聚合名称
- * @param fieldName 聚合字段
- * @return
- */
- public AbstractAggregationBuilder addAggregationBuilder(String aggType,String aggName, String fieldName){
- AbstractAggregationBuilder builder = null;
- if(aggType.equals("sum")){
- builder = AggregationBuilders.sum(aggName).field(fieldName);
- }else if(aggType.equals("count")){
- builder = AggregationBuilders.count(aggName).field(fieldName);
- }else if(aggType.equals("avg")){
- builder = AggregationBuilders.avg(aggName).field(fieldName);
- }else if(aggType.equals("max")){
- builder = AggregationBuilders.max(aggName).field(fieldName);
- }else if(aggType.equals("min")){
- builder = AggregationBuilders.min(aggName).field(fieldName);
- }
- return builder;
- }
- /**
- * 分组后,简单聚合
- * @param aggType 聚合类型
- * @param aggName 聚合名称
- * @param fieldName 聚合字段
- * @return
- */
- public AbstractAggregationBuilder addTermAggregationBuilder(String termName, String termFieldName,String aggType,String aggName, String fieldName){
- AbstractAggregationBuilder builder = null;
- TermsBuilder termsBuilder = addTermsBuilder(termName,termFieldName);
- if(aggType.equals("sum")){
- builder = termsBuilder.subAggregation(AggregationBuilders.sum(aggName).field(fieldName));
- }else if(aggType.equals("count")){
- builder = termsBuilder.subAggregation(AggregationBuilders.count(aggName).field(fieldName));
- }else if(aggType.equals("avg")){
- builder = termsBuilder.subAggregation(AggregationBuilders.avg(aggName).field(fieldName));
- }else if(aggType.equals("max")){
- builder = termsBuilder.subAggregation(AggregationBuilders.max(aggName).field(fieldName));
- }else if(aggType.equals("min")){
- builder = termsBuilder.subAggregation(AggregationBuilders.min(aggName).field(fieldName));
- }
- return builder;
- }
- /**
- * 添加分组
- * @param termName
- * @param fieldName
- * @return
- */
- public TermsBuilder addTermsBuilder(String termName, String fieldName){
- TermsBuilder termBuilder = AggregationBuilders.terms(termName).field(fieldName);
- return termBuilder;
- }
- /**
- *
- * @param client
- * @param boolQueryBuilder 查询的过滤条件
- * @param aggBuilderList 聚合组
- * 聚合组中成员是 单个的聚合查询,其中单个的聚合查询可以嵌套子聚合查询 如:
- * 单个聚合 查询 :TermsBuilder firstAgg= AggregationBuilders.terms("player_count ").field("team");
- * 带有子聚合的聚合查询 :
- * TermsBuilder secondAgg= AggregationBuilders.terms("pos_count").field("position")
- * .subAggregation(
- * AggregationBuilders.dateHistogram("by_year").field("dateOfBirth").interval((DateHistogramInterval.YEAR))
- * .subAggregation(
- * AggregationBuilders.avg("avg_children").field("children")
- * )
- * );
- * @return 聚合结果集
- */
- public Map<String, Aggregation> structAggregationQuery(TransportClient client,String index,String type,
- BoolQueryBuilder boolQueryBuilder,
- LinkedList<AbstractAggregationBuilder> aggBuilderList){
- SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type).setQuery(boolQueryBuilder);
- for(AbstractAggregationBuilder aggBuilder : aggBuilderList){
- searchRequestBuilder.addAggregation(aggBuilder);
- }
- SearchResponse response = searchRequestBuilder.execute().actionGet();
- Map<String, Aggregation> map = response.getAggregations().getAsMap();
- client.close();
- return map;
- }
- /**
- * 表格型 - 数据结果解析
- * @param
- * @param map 聚合查询结果
- * @return
- */
- public List<Map<String, Object>> tableDataParsing(View view,Map<String, Aggregation> map){
- // List<String> rowList = new LinkedList<>();
- // rowList.add("town_terms");
- // rowList.add("total");
- // List<String> cloumnList = new ArrayList<>();
- // cloumnList.add("total_count_result");
- // cloumnList.add("sex_terms");
- List<String> rowList = getRowDimensionList(view.getId());
- List<String> cloumnList = getColumnDimensionList(view.getId());
- List<ViewQuota> viewQuotas = viewQuotaDao.findByViewId(view.getId());
- Map<String, Object> quotaMap = new HashMap<>();
- for(ViewQuota viewQuota :viewQuotas){
- List<ViewQuotaFilter> viewQuotaFilters = viewQuotaFilterDao.findByRelationId(viewQuota.getId());
- String filter = "noFilter";
- if(viewQuotaFilters != null && viewQuotaFilters.size() > 0){
- filter = "yesFilter";
- }
- quotaMap.put(viewQuota.getCode(),filter);
- //指标也作为列维度
- cloumnList.add(viewQuota.getBasicFormulaType() +"-result");
- }
- List<Map<String, Object>> resultList = new LinkedList<Map<String, Object>>();
- //合计行 数据处理 开始
- boolean total = false;
- Map<String, Object> totalResultMap = new HashMap<>();
- totalResultMap.put(rowList.get(0),"total");//默认第一个行维度
- for(String rowAggCode: rowList) {
- if (rowAggCode.contains("total")) {
- total = true;
- for (String cloumnAggCode : cloumnList) {
- for(String aggCode:map.keySet()) {
- Aggregation aggregation = map.get(aggCode);
- if (aggregation.getName().equals("total_" + cloumnAggCode)) {
- Map<String, Object> subDataMap = getInternalAggValue(cloumnAggCode, aggregation);
- if (subDataMap == null || subDataMap.size() == 0) {
- Terms subTerm = (Terms) aggregation;
- Map<String, Object> data = getTermAggregationData(subTerm.getBuckets().iterator(), cloumnAggCode);
- totalResultMap.putAll(data);
- } else {
- totalResultMap.putAll(subDataMap);
- }
- }
- }
- }
- }
- }
- if(total){
- resultList.add(totalResultMap);
- //合计行 数据处理 接收
- }
- //非合计数据处理
- for(String rowAggCode: rowList){
- if( !rowAggCode.contains("total")){
- for(String aggCode:map.keySet()){
- Aggregation aggregation = map.get(aggCode);
- String aggName = aggregation.getName();
- if(rowAggCode.equals(aggName)){
- Terms terms = (Terms)aggregation;
- String termNmae = terms.getName();
- String key = "";
- List<Terms.Bucket> buckets = terms.getBuckets();
- for(Terms.Bucket bucket : buckets){
- Map<String, Object> resultMap = new HashMap<>();
- key = bucket.getKeyAsString() != null ? bucket.getKeyAsString() : bucket.getKey().toString();
- resultMap.put(termNmae,key);
- for(String cloumnAggCode: cloumnList){
- List<Aggregation> aggregationList = bucket.getAggregations().asList();
- for(Aggregation subAgg : aggregationList){
- if(subAgg.getName().equals(cloumnAggCode)){
- Map<String, Object> subDataMap = getInternalAggValue(subAgg.getName(),subAgg);
- if(subDataMap == null || subDataMap.size() == 0){
- Terms subTerm = (Terms)subAgg;
- Map<String, Object> data = getTermAggregationData(subTerm.getBuckets().iterator(), cloumnAggCode);
- resultMap.putAll(data);
- }else {
- resultMap.putAll(subDataMap);
- }
- }
- }
- }
- resultList.add(resultMap);
- }
- }
- }
- }
- }
- return resultList;
- }
- /**
- * 数值型 - 数据结果解析
- * @param
- * @param map 聚合查询结果
- * @return
- */
- public Map<String, Object> numericalDataParsing(View view,Map<String, Aggregation> map){
- Map<String, Object> resultMap = new HashMap<>();
- List<ViewQuota> viewQuotas = viewQuotaDao.findByViewId(view.getId());
- Map<String, Object> quotaMap = new HashMap<>();
- for(ViewQuota viewQuota :viewQuotas){
- List<ViewQuotaFilter> viewQuotaFilters = viewQuotaFilterDao.findByRelationId(viewQuota.getId());
- String filter = "noFilter";
- if(viewQuotaFilters != null && viewQuotaFilters.size() > 0){
- filter = "yesFilter";
- }
- quotaMap.put(viewQuota.getCode(),filter);
- }
- for(String aggCode:map.keySet()) {
- Aggregation aggregation = map.get(aggCode);
- for(String quotaCode :quotaMap.keySet()){
- String quotaAggCode = quotaCode;
- if(quotaMap.get(quotaCode).equals("yesFilter")){
- quotaAggCode = quotaCode + "-filter" ;
- }
- if(!StringUtils.isEmpty(quotaAggCode) && aggCode.contains(quotaAggCode)){
- Map<String, Object> dataMap = getInternalAggValue(quotaCode, aggregation);
- resultMap.putAll(dataMap);
- }
- }
- }
- return resultMap;
- }
- /**
- * 图表型 - 数据结果解析
- * 一个展示维度,多个指标组成
- * @param
- * @param map 聚合查询结果
- * @return
- */
- public Option ehartDataParsing(View view,Map<String, Aggregation> map){
- Option option = null;
- ReportOption reportOption = new ReportOption();
- List<String> xDataList = new LinkedList<>();
- List<String> charTypes = new LinkedList<>();
- List<String> lineNames = new LinkedList<>();
- List<List<Object>> lineDataList = new LinkedList<>();
- List<String> rowList = new LinkedList<>();
- rowList.add("town_terms");
- // rowList = getRowDimensionList(view.getId());
- if(rowList.size() != 1){
- logger.debug("图表型指标维度配置有误");
- return null;
- }
- String rowAggCode = rowList.get(0);
- List<String> quotaList = new LinkedList<>();
- Map<String,List<Object>> quotasDataMap = new LinkedHashMap<>();
- List<ViewQuota> viewQuotas = new ArrayList<>();
- ViewQuota vq = new ViewQuota();
- vq.setChartType("1");
- vq.setCode("total");
- vq.setName("各区县总人数");
- viewQuotas.add(vq);
- // viewQuotas = viewQuotaDao.findByViewId(view.getId());
- for(ViewQuota viewQuota :viewQuotas){
- quotaList.add(viewQuota.getCode());
- charTypes.add(viewQuota.getChartType());
- lineNames.add(viewQuota.getName());
- }
- Aggregation aggregation = map.get(rowAggCode);
- if(rowAggCode.equals(aggregation.getName())){
- Terms terms = (Terms)aggregation;
- String key = "";
- List<Terms.Bucket> buckets = terms.getBuckets();
- for(String quotaCode: quotaList){
- List<Object> quotaData = new ArrayList<>();
- for(Terms.Bucket bucket : buckets){
- key = bucket.getKeyAsString() != null ? bucket.getKeyAsString() : bucket.getKey().toString();
- xDataList.add(key);
- List<Aggregation> aggregationList = bucket.getAggregations().asList();
- for(Aggregation subAgg : aggregationList){
- if(subAgg.getName().contains(quotaCode)){
- Map<String, Object> subDataMap = getInternalAggValue(subAgg.getName(),subAgg);
- if(subDataMap != null || subDataMap.size() > 0){
- quotaData.add(subDataMap.get(subAgg.getName()));
- }
- }
- }
- }
- quotasDataMap.put(quotaCode, quotaData);
- }
- }
- for (Map.Entry<String,List<Object>> entry : quotasDataMap.entrySet()) {
- lineDataList.add(entry.getValue());
- }
- option = reportOption.getLineEchartOptionMoreChart(view.getName(), "xName", "yName", xDataList.toArray(), lineDataList , lineNames, charTypes);
- return option;
- }
- /**
- * 获取列维度
- * @param viewId
- * @return
- */
- private List<String> getColumnDimensionList(String viewId){
- List<String> cloumnList = new ArrayList<>();
- List<ViewDimension> colDimensionList = viewDimensionDao.getColDimensionList(viewId);
- for (ViewDimension colViewDimension : colDimensionList) {
- cloumnList.add(colViewDimension.getDimensionCode());
- }
- return cloumnList;
- }
- /**
- * 获取行维度
- * @param viewId
- * @return
- */
- private List<String> getRowDimensionList(String viewId){
- List<String> rowList = new ArrayList<>();
- // 视图各组组内顶层行维度(升序)
- List<ViewDimension> groupTopRowDimensionList = viewDimensionDao.getGroupTopRowDimensionList(viewId);
- for (ViewDimension viewDimension : groupTopRowDimensionList) {
- String aggType = "-terms";
- String dataType = cubeService.findDimensionDataType("cubeCode:1", viewDimension.getDimensionCode());
- if(!StringUtils.isEmpty(dataType)){
- if(dataType.equals("date")){
- aggType = "-date_histogram";
- }
- }
- rowList.add(viewDimension.getDimensionCode() + aggType);
- List<ViewDimension> otherRowDimensionList = viewDimensionDao.getGroupOtherRowDimensionList(viewId, viewDimension.getGroupRow());
- for (ViewDimension otherViewDimension : otherRowDimensionList) {
- rowList.add(otherViewDimension.getDimensionCode() + aggType);;
- }
- }
- return rowList;
- }
- private List<Aggregation> getAggregationList(Iterator<Terms.Bucket> gradeBucket){
- List<Aggregation> aggregationList = new ArrayList<>();
- while (gradeBucket.hasNext()) {
- Terms.Bucket b = gradeBucket.next();
- aggregationList = b.getAggregations().asList();
- }
- return aggregationList;
- }
- /**
- * 查询未分组-统计值
- * @param aggregation
- * @return
- */
- public Map<String, Object> getInternalAggValue(String key, Aggregation aggregation){
- Map<String, Object> map = new HashMap<>();
- Object value = null;
- if (aggregation instanceof InternalValueCount) {
- InternalValueCount valueCount = (InternalValueCount) aggregation;
- value = valueCount.getValue();
- }else if (aggregation instanceof InternalSum) {
- InternalSum valueCount = (InternalSum) aggregation;
- value = valueCount.getValue();
- }else if (aggregation instanceof InternalMax) {
- InternalMax valueCount = (InternalMax) aggregation;
- value = valueCount.getValue();
- }else if (aggregation instanceof InternalMin) {
- InternalMin valueCount = (InternalMin) aggregation;
- value = valueCount.getValue();
- }else if (aggregation instanceof InternalAvg) {
- InternalAvg valueCount = (InternalAvg) aggregation;
- value = valueCount.getValue();
- }else if (aggregation instanceof InternalFilter) {
- InternalFilter internalFilter = (InternalFilter) aggregation;
- List<Aggregation> aggregationList = internalFilter.getAggregations().asList();
- Map<String, Object> filterValue = getInternalAggValue(key,aggregationList.get(0));
- return filterValue;
- }else if (aggregation instanceof InternalCardinality) {//去重
- InternalCardinality valueCount = (InternalCardinality) aggregation;
- value = valueCount.getValue();
- }
- if(value != null){
- map.put(key, value);
- }
- return map;
- }
- /**
- * 查询带分组-统计值
- * @param gradeBucket
- * @param aggCode
- * @return
- */
- private Map<String, Object> getTermAggregationData(Iterator<Terms.Bucket> gradeBucket ,String aggCode){
- Map<String, Object> map = new HashMap<>();
- while (gradeBucket.hasNext()) {
- Terms.Bucket b = gradeBucket.next();
- String subVal = "";
- if(b.getKeyAsString() !=null){
- subVal = aggCode+ "-" + b.getKeyAsString();
- }else {
- subVal = aggCode+ "-" + b.getKey();
- }
- for(Aggregation aggregation : b.getAggregations().asList()){
- if (aggregation instanceof InternalValueCount) {
- InternalValueCount valueCount = (InternalValueCount) aggregation;
- map.put(subVal, valueCount.getValue() );
- }else if (aggregation instanceof InternalSum) {
- InternalSum valueCount = (InternalSum) aggregation;
- map.put(subVal, valueCount.getValue() );
- }else if (aggregation instanceof InternalMax) {
- InternalMax valueCount = (InternalMax) aggregation;
- map.put(subVal, valueCount.getValue());
- }else if (aggregation instanceof InternalMin) {
- InternalMin valueCount = (InternalMin) aggregation;
- map.put(subVal, valueCount.getValue());
- }else if (aggregation instanceof InternalAvg) {
- InternalAvg valueCount = (InternalAvg) aggregation;
- map.put(subVal, valueCount.getValue());
- }else {
- Terms terms = (Terms)aggregation;
- getTermAggregationData(terms.getBuckets().iterator(), aggCode);
- }
- }
- }
- return map;
- }
- }
|