Browse Source

数据解析 增加 数据关联查询

jkzlzhoujie 6 years ago
parent
commit
2338ccadf6

+ 1 - 0
src/main/java/com/yihu/quota/kafka/ConsumerListener.java

@ -16,6 +16,7 @@ public class ConsumerListener {
    @KafkaListener(topics = "sep-hbase-data")
    public void loadData(ConsumerRecord<?, ?> record) {
        if(record.value() != null){
            System.out.println("消息:" + record.value().toString());
            elasticSearchDataProcessService.saveData(record.value().toString());
        }
    }

+ 74 - 7
src/main/java/com/yihu/quota/service/cube/ElasticSearchDataProcessService.java

@ -33,13 +33,15 @@ public class ElasticSearchDataProcessService {
    private static String dataSource_hbase = "hbase";
    private static String dataSource_mysql = "mysql";
    private static String action_del = "DeleteFamily";//删除整行
    private static String action_put = "Put";                //添加和修改单个字段值
    private static String action_delAll = "DelAll";
    private static String action_put = "Put";         //添加和修改单个字段值
    private static String action_delAll = "DelAll"; //删除整张表数据
    private static String action_putAll = "PutAll"; //添加整张表数据
    private static String dataSource_k = "dataSource";
    private static String database_k = "database";
    private static String table_k = "table";
    private static String id_k = "_id";
    private static String cubeId_k = "cubeId";
    private static String rowKey_k = "rowkey";
    private static String profileId_k = "profile_id";
    private static String action_k = "action";
@ -90,6 +92,7 @@ public class ElasticSearchDataProcessService {
        String table = "";
        String rowKey = "";
        String profileId = "";
        String cubeId = "";
        String action = "";
        String database = "";
        if (dataMap.containsKey(database_k)) {
@ -101,6 +104,9 @@ public class ElasticSearchDataProcessService {
        if(dataMap.containsKey(rowKey_k)){
            rowKey = dataMap.get(rowKey_k).toString();
        }
        if(dataMap.containsKey(cubeId_k)){
            cubeId = dataMap.get(cubeId_k).toString();
        }
        if(dataMap.containsKey(profileId_k)){
            profileId = dataMap.get(profileId_k).toString();
        }
@ -112,7 +118,7 @@ public class ElasticSearchDataProcessService {
            dataMap.remove(rowKey_k);
            dataMap.remove(action_k);
            String baseCloumnValue = null;
            if(action.contains(action_put)){
            if(action.equals(action_put)){
                for(String baseCloumnCode : dataMap.keySet()){
                    if(dataMap.get(baseCloumnCode)!= null){
                        baseCloumnValue = dataMap.get(baseCloumnCode).toString();
@ -181,9 +187,8 @@ public class ElasticSearchDataProcessService {
                            }
                        }
                    }
                    //主表字段关联出相关表数据 end
                }
            }else if(action.contains(action_del)){
            }else if(action.equals(action_del)){
                //一个表只能对应到一个 索引type
                Cube cube = cubeMemberMappingService.findCubeByTableCode(table);
                if(cube != null){
@ -191,10 +196,69 @@ public class ElasticSearchDataProcessService {
                }else {
                    throw new Exception("视图,表不存在");
                }
            }else if (action.equals(action_putAll)) {
                if (dataMap.containsKey("cubeId")) {
                    Cube cube = this.getIndexTypeById(cubeId);
                    if (null != cube) {
                        String index = cube.getIndexName();
                        String type = cube.getIndexType();
                        Map<String, Object> source  = new HashMap<>();
                        source.put(id_k,rowKey);
                        source.put(rowKey_k,rowKey);
                        for(String baseCloumnCode : dataMap.keySet()){
                            System.out.println("列:" + baseCloumnCode );
                            if(dataMap.get(baseCloumnCode)!= null){
                                baseCloumnValue = dataMap.get(baseCloumnCode).toString();
                                List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, baseCloumnCode);
                                if(cubeMappingModels != null && cubeMappingModels.size() > 0){
                                    for(CubeMappingModel cubeMappingModel :cubeMappingModels){
                                        source.putAll(dimensionDataExtendToMap(cubeMappingModel,cubeMappingModel.getDimensionCode(),baseCloumnValue));
                                    }
                                }
                                // 是否是子集属性
                                profileId = rowKey;
                                String subRowKey = rowKey;
                                List<CubeMemberMappingModel> cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,baseCloumnCode);
                                if(cubeMemberMappingModels != null && cubeMemberMappingModels.size() > 0){
                                    for(CubeMemberMappingModel cubeMemberMappingModel :cubeMemberMappingModels){
                                        source.putAll(dimensionMemberDataExtendToMap(cubeMemberMappingModel,baseCloumnValue,subRowKey,profileId));
                                    }
                                }
//                              //主表字段关联出相关表数据 start
                                List<CubeMemberMappingModel> relationMemberMappingModels = cubeMemberMappingService.findRelationMemberMappingModels(table,baseCloumnCode);
                                if(relationMemberMappingModels != null && relationMemberMappingModels.size() > 0) {
                                    for (CubeMemberMappingModel cubeMemberMappingModel : relationMemberMappingModels) {
                                        //父级对象
                                        if(cubeMemberMappingModel.getChildSaveType() == 1 && cubeMemberMappingModel.getRelationSubFieldId() != null){
                                            DataSourcesTableModel dtm = dataSourcesTableFieldService.findDataSourcesTableModel(cubeMemberMappingModel.getDataFieldId());
                                            DataSourcesTableModel subDtm = dataSourcesTableFieldService.findDataSourcesTableModel(cubeMemberMappingModel.getRelationSubFieldId());
                                            if(dtm != null && subDtm != null){
                                                String value = jdbcBasicService.getEntityByRelationId(subDtm.getDatabaseName() ,subDtm.getTableCode(), dtm.getFieldCode(),subDtm.getFieldCode(),subDtm.getFieldType(),baseCloumnValue);
                                                if(StringUtils.isNotEmpty(value)){
                                                    String parentCode = cubeMemberMappingModel.getParentCode();
                                                    Map<String,Object> childMap = new HashMap<>();
                                                    childMap = dimensionMemberDataExtendToMap(cubeMemberMappingModel, value, null, profileId);
                                                    if(source.get(parentCode) != null ){
                                                        Map<String, Object> parentMap  = (Map<String, Object>) source.get(parentCode);
                                                        parentMap.putAll((Map<String, Object>)childMap.get(parentCode));
                                                        source.put(parentCode,parentMap);
                                                    }else {
                                                        source.put(parentCode,childMap.get(parentCode));
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                        saveElasticSearchData(index, type,profileId,source);
                    } else {
                        throw new  Exception("索引不存在");
                    }
                }
            }else if (action.contains(action_delAll)) {
                String cubeId = "";
                if (dataMap.containsKey("cubeId")) {
                    cubeId = dataMap.remove("cubeId").toString();
                    Cube cube = this.getIndexTypeById(cubeId);
                    if (null != cube) {
                        elasticSearchUtil.deleteByField(cube.getIndexName(), cube.getIndexType(), "_index", cube.getIndexName());
@ -461,6 +525,9 @@ public class ElasticSearchDataProcessService {
                    Date dateValue = null;
                    if(keyValue.length() > 10){
                        if(keyValue.contains("-") ){
                            //2016-08-04T00:00:00Z+0800
                            keyValue = keyValue.substring(0,19);
                            keyValue = keyValue.replace('T',' ');
                            dateValue = DateUtil.parseDate(keyValue, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
                        }else {
                            //时间戳 1531130451000

+ 6 - 1
src/main/java/com/yihu/quota/service/cube/JdbcBasicService.java

@ -31,8 +31,13 @@ public class JdbcBasicService {
        List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
        if(list != null && list.size() > 0){
            map = list.get(0);
            if(map.get(cloumnCode) == null ){
                return null;
            }
            return map.get(cloumnCode).toString();
        }else {
            return null;
        }
        return map.get(cloumnCode).toString();
    }
}