Browse Source

olap 视图ES查询 相关接口

jkzlzhoujie 6 years ago
parent
commit
2d69714dd1

+ 26 - 1
src/main/java/com/yihu/quota/service/cube/CubeMemberMappingService.java

@ -50,7 +50,7 @@ public class CubeMemberMappingService extends BaseJpaService<CubeMemberMapping,
    }
    /**
     * 根据表编码和字段编码 查询对应的维度信息
     * 根据 表编码和字段编码 查询对应的维度信息
     * @param fieldCode
     * @return
     */
@ -67,6 +67,31 @@ public class CubeMemberMappingService extends BaseJpaService<CubeMemberMapping,
        return cubeMemberMappingModels;
    }
    /**
     * 根据 表编码 查询对应的 保存索引和类型
     * @param tableCode
     * @return
     */
    public Cube findCubeByTableCode(String tableCode) {
        String sql = "SELECT oc.* from olap_cube oc " +
                " LEFT JOIN olap_cube_mapping ocm ON oc.id = ocm.cube_id " +
                " LEFT JOIN olap_data_sources_table_field odst ON odst.id = ocm.data_field_id " +
                " LEFT JOIN olap_data_sources_table ods ON ods.id = odst.table_id  " +
                " where ods.table_code = ? ";
        String param[] = {tableCode};
        List<Cube> cubes = jdbcTemplate.query(sql, new BeanPropertyRowMapper(Cube.class), param);
        if(cubes != null && cubes.size() > 0){
            return cubes.get(0);
        }else {
            return null;
        }
    }
    /**
     * 查询子集中 主键字段
     * @param cubeMappingId
     * @return
     */
    public CubeMemberMapping findCubeMemberMappingPrimary(int cubeMappingId){
        String[] fields = {"cubeMappingId","isPrimarykey"};
        Object[] values = {cubeMappingId,1};

+ 55 - 39
src/main/java/com/yihu/quota/service/cube/ElasticSearchDataProcessService.java

@ -8,6 +8,7 @@ import com.yihu.ehr.util.datetime.DateUtil;
import com.yihu.quota.etl.formula.AgeGroupFunc;
import com.yihu.quota.etl.formula.DictFunc;
import com.yihu.quota.etl.formula.DivisionFunc;
import com.yihu.quota.model.cube.Cube;
import com.yihu.quota.model.cube.CubeMemberMapping;
import com.yihu.quota.util.ElasticSearchHandler;
import com.yihu.quota.vo.CubeMappingModel;
@ -103,16 +104,16 @@ public class ElasticSearchDataProcessService {
            dataMap.remove(rowKey_k);
            dataMap.remove(action_k);
            String keyValue = "";
            for(String hbaseColCode : dataMap.keySet()){
                if(dataMap.get(hbaseColCode)!= null){
                     keyValue = dataMap.get(hbaseColCode).toString();
                }
                List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, hbaseColCode);
                if(cubeMappingModels != null && cubeMappingModels.size() > 0){
                    for(CubeMappingModel cubeMappingModel :cubeMappingModels){
                        String index = cubeMappingModel.getIndexName();
                        String type = cubeMappingModel.getIndexType();
                        if(action.contains(action_put)){
            if(action.contains(action_put)){
                for(String hbaseColCode : dataMap.keySet()){
                    if(dataMap.get(hbaseColCode)!= null){
                        keyValue = dataMap.get(hbaseColCode).toString();
                    }
                    List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, hbaseColCode);
                    if(cubeMappingModels != null && cubeMappingModels.size() > 0){
                        for(CubeMappingModel cubeMappingModel :cubeMappingModels){
                            String index = cubeMappingModel.getIndexName();
                            String type = cubeMappingModel.getIndexType();
                            String cloumnCode = cubeMappingModel.getDimensionCode();
                            String dataType = cubeMappingModel.getDataType();
                            String dict = cubeMappingModel.getDict();
@ -122,27 +123,35 @@ public class ElasticSearchDataProcessService {
                            Map<String, Object> esDataMap = dimensionDataExtendToMap(keyValue, cloumnCode, dataType, dict, algorithm, algorithmParm);
                            source.putAll(esDataMap);
                            saveElasticSearchData(index, type,rowKey,source);
                        }else if(action.contains(action_del)){
                            elasticSearchUtil.delete(index,type,rowKey);
                        }
                    }
                }
                // 是否是子集属性
                List<CubeMemberMappingModel> cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,hbaseColCode);
                if(cubeMemberMappingModels != null && cubeMemberMappingModels.size() > 0){
                    for(CubeMemberMappingModel cubeMemberMappingModel :cubeMemberMappingModels){
                        CubeMemberMapping primaryCubeMember = cubeMemberMappingService.findCubeMemberMappingPrimary(cubeMemberMappingModel.getCubeMappingId());
                        String index = cubeMemberMappingModel.getIndexName();
                        String type = cubeMemberMappingModel.getIndexType();
                        String primaryKey = primaryCubeMember.getDimensionCode();
                        String subRowKey = rowKey;
                        if(action.contains(action_put)){
                            //维度成员数据扩展保存
                            source = dimensionMemberDataExtendToMap(cubeMemberMappingModel,keyValue,subRowKey,profileId,primaryKey);
                            saveElasticSearchData(index, type,profileId,source);
                    //如果存在维度的话 rowkey 赋值给 profileId
                    if(cubeMappingModels != null){
                        profileId = rowKey;
                    }
                    // 是否是子集属性
                    List<CubeMemberMappingModel> cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,hbaseColCode);
                    if(cubeMemberMappingModels != null && cubeMemberMappingModels.size() > 0){
                        for(CubeMemberMappingModel cubeMemberMappingModel :cubeMemberMappingModels){
                            String index = cubeMemberMappingModel.getIndexName();
                            String type = cubeMemberMappingModel.getIndexType();
                            String subRowKey = rowKey;
                            if(action.contains(action_put)){
                                //维度成员数据扩展保存
                                source = dimensionMemberDataExtendToMap(cubeMemberMappingModel,keyValue,subRowKey,profileId);
                                saveElasticSearchData(index, type,profileId,source);
                            }
                        }
                    }
                }
            }else if(action.contains(action_del)){
                //一个表只能对应到一个 索引type
                Cube cube = cubeMemberMappingService.findCubeByTableCode(table);
                if(cube != null){
                    elasticSearchUtil.delete(cube.getIndexType(),cube.getIndexType(),rowKey);
                }else {
                    throw new Exception("视图,表不存在");
                }
            }
        }catch (ParseException e){
            logger.debug("elasticSearch 执行失败");
@ -174,10 +183,6 @@ public class ElasticSearchDataProcessService {
            if(algorithm.equals("AgeGroupFunc")){
                source = extendAgeGroupData(source,cloumnCode,dict,cloumnValue);
            }
            //区域算法
            if(algorithm.equals("DivisionFunc") && StringUtils.isNotEmpty(algorithmParm)){
                source = extendDivisionData(source, cloumnCode, algorithmParm, cloumnValue);
            }
            //其他算法 --
        }else {
            source.put(cloumnCode,dataConver(dataType,cloumnValue));
@ -192,8 +197,7 @@ public class ElasticSearchDataProcessService {
     * @param subRowKey
     * @return
     */
    public Map<String,Object> dimensionMemberDataExtendToMap(CubeMemberMappingModel cubeMemberMappingModel,String cloumnValue,String subRowKey
            ,String profileId,String primaryKey) throws Exception {
    public Map<String,Object> dimensionMemberDataExtendToMap(CubeMemberMappingModel cubeMemberMappingModel,String cloumnValue,String subRowKey,String profileId) throws Exception {
        Map<String, Object> source  = new HashMap<>();
        String cloumnCode = cubeMemberMappingModel.getDimensionCode();
        String parentCode = cubeMemberMappingModel.getParentCode();
@ -204,19 +208,31 @@ public class ElasticSearchDataProcessService {
        int childSaveType = cubeMemberMappingModel.getChildSaveType();
        String index = cubeMemberMappingModel.getIndexName();
        String type = cubeMemberMappingModel.getIndexType();
        if(childSaveType == 1 ){//对象方式
            Map<String, Object> objChildMap  = new HashMap<>();
            objChildMap.put(cloumnCode,cloumnValue);
            if(StringUtils.isNotEmpty(dict)){
                objChildMap = extendDictData(objChildMap,cloumnCode,dict,cloumnValue);
                source.put(parentCode,objChildMap);
            }else if(StringUtils.isNotEmpty(algorithm)){
                //区域算法
                if(algorithm.equals("DivisionFunc") && StringUtils.isNotEmpty(algorithmParm)){
                    objChildMap = extendDivisionData(objChildMap, cloumnCode, algorithmParm, cloumnValue);
                }
                source.put(parentCode,objChildMap);
                //其他算法 --
                //其他算法 统一改为反射的方式 计算 TODO
            }else {
                objChildMap.put(cloumnCode,cloumnValue);
            }
            source.put(parentCode,objChildMap);
        }else if(childSaveType == 2 ){//nested 方式
            //查找子集主键字段
            CubeMemberMapping primaryCubeMember = cubeMemberMappingService.findCubeMemberMappingPrimary(cubeMemberMappingModel.getCubeMappingId());
            String primaryKeyCode = "";
            if(primaryCubeMember != null ){
                primaryKeyCode = primaryCubeMember.getDimensionCode();
            }else {
                return null;
            }
            List<Map<String,Object>> nestedList = new ArrayList<>();
            //查出历史数据 然后组合保存
            Map<String, Object> oldMataMap = elasticSearchUtil.findById(index, type, profileId);
@ -226,8 +242,8 @@ public class ElasticSearchDataProcessService {
                if(childList != null && childList.size() > 0){
                    boolean isexist = false;
                    for(Map<String, Object> map : childList){
                        if(subRowKey.equals(map.get(primaryKey).toString())){
                            map.put(primaryKey,subRowKey);
                        if(subRowKey.equals(map.get(primaryKeyCode).toString())){
                            map.put(primaryKeyCode,subRowKey);
                            map.put(cloumnCode,dataConver(dataType,cloumnValue));
                            if(StringUtils.isNotEmpty(dict)){
                                map = extendDictData(map,cloumnCode,dict,cloumnValue);
@ -240,7 +256,7 @@ public class ElasticSearchDataProcessService {
                    }
                    if( !isexist){
                        Map<String,Object> map = new HashMap<>();
                        map.put(primaryKey,subRowKey);
                        map.put(primaryKeyCode,subRowKey);
                        map.put(cloumnCode,dataConver(dataType,cloumnValue));
                        if(StringUtils.isNotEmpty(dict)){
                            map = extendDictData(map,cloumnCode,dict,cloumnValue);
@ -252,7 +268,7 @@ public class ElasticSearchDataProcessService {
                    source.put(parentCode, nestedList);
                }else{
                    Map<String,Object> map = new HashMap<>();
                    map.put(primaryKey,subRowKey);
                    map.put(primaryKeyCode,subRowKey);
                    map.put(cloumnCode,dataConver(dataType,cloumnValue));
                    nestedList.add(map);
                    source.put(parentCode, nestedList);

+ 122 - 0
src/main/java/com/yihu/quota/util/AggregationBuildHandler.java

@ -0,0 +1,122 @@
package com.yihu.quota.util;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
 * Created by janseny on 2018/9/28.
 */
@Component
public class AggregationBuildHandler {
    /**
     * 简单聚合
     * @param aggType 聚合类型
     * @param aggName 聚合名称
     * @param fieldName 聚合字段
     * @return
     */
    public AbstractAggregationBuilder addAggregationBuilder(String aggType,String aggName, String fieldName){
        AbstractAggregationBuilder builder = null;
        if(aggType.equals("sum")){
            builder = AggregationBuilders.sum(aggName).field(fieldName);
        }else if(aggType.equals("count")){
            builder = AggregationBuilders.count(aggName).field(fieldName);
        }else if(aggType.equals("avg")){
            builder = AggregationBuilders.avg(aggName).field(fieldName);
        }else if(aggType.equals("max")){
            builder = AggregationBuilders.max(aggName).field(fieldName);
        }else if(aggType.equals("min")){
            builder = AggregationBuilders.min(aggName).field(fieldName);
        }
        return  builder;
    }
    /**
     * 分组后,简单聚合
     * @param aggType 聚合类型
     * @param aggName 聚合名称
     * @param fieldName 聚合字段
     * @return
     */
    public AbstractAggregationBuilder addTermAggregationBuilder(String termName, String termFieldName,String aggType,String aggName, String fieldName){
        AbstractAggregationBuilder builder = null;
        TermsBuilder termsBuilder = addTermsBuilder(termName,termFieldName);
        if(aggType.equals("sum")){
            builder = termsBuilder.subAggregation(AggregationBuilders.sum(aggName).field(fieldName));
        }else if(aggType.equals("count")){
            builder = termsBuilder.subAggregation(AggregationBuilders.count(aggName).field(fieldName));
        }else if(aggType.equals("avg")){
            builder = termsBuilder.subAggregation(AggregationBuilders.avg(aggName).field(fieldName));
        }else if(aggType.equals("max")){
            builder = termsBuilder.subAggregation(AggregationBuilders.max(aggName).field(fieldName));
        }else if(aggType.equals("min")){
            builder = termsBuilder.subAggregation(AggregationBuilders.min(aggName).field(fieldName));
        }
        return  builder;
    }
    /**
     * 添加分组
     * @param termName
     * @param fieldName
     * @return
     */
    public TermsBuilder addTermsBuilder(String termName, String fieldName){
        TermsBuilder termsBuilder= AggregationBuilders.terms(termName).field(fieldName);
        return termsBuilder;
    }
    /**
     *
     * @param client
     * @param boolQueryBuilder  查询的过滤条件
     * @param aggBuilderList  聚合组
     *  聚合组中成员是 单个的聚合查询,其中单个的聚合查询可以嵌套子聚合查询 如:
     *  单个聚合 查询 :TermsBuilder firstAgg= AggregationBuilders.terms("player_count ").field("team");
     *  带有子聚合的聚合查询 :
     *    TermsBuilder secondAgg= AggregationBuilders.terms("pos_count").field("position")
     *      .subAggregation(
     *          AggregationBuilders.dateHistogram("by_year").field("dateOfBirth").interval((DateHistogramInterval.YEAR))
     *              .subAggregation(
     *                  AggregationBuilders.avg("avg_children").field("children")
     *              )
     *          );
     * @return 结果集
     */
    public List<Map<String, Object>> structAggregationQuery(TransportClient client,String index,String type,
                                                            BoolQueryBuilder boolQueryBuilder,
                                                            LinkedList<AbstractAggregationBuilder> aggBuilderList){
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index).setTypes(type).
                setQuery(boolQueryBuilder);
        for(AbstractAggregationBuilder aggBuilder : aggBuilderList){
            searchRequestBuilder.addAggregation(aggBuilder);
        }
        SearchResponse sr = searchRequestBuilder.execute().actionGet();
        List<Map<String, Object>> matchRsult = new LinkedList<Map<String, Object>>();
        //数据解析 TODO
//        ValueCount sum = sr.getAggregations().get("count_result");
//        double v = sum.getValue();
        client.close();
        return matchRsult;
    }
}