2 Commits 46652e8257 ... 4538ccd043

Auteur SHA1 Bericht Datum
  jkzlzhoujie 4538ccd043 全量导入数据时一次解析多条数据 增加解析效率 6 jaren geleden
  jkzlzhoujie 893c3b7785 全量导入数据时一次解析多条数据 增加解析效率 6 jaren geleden

+ 37 - 33
src/main/java/com/yihu/quota/service/cube/ElasticSearchDataProcessService.java

@ -50,8 +50,6 @@ public class ElasticSearchDataProcessService {
    private ObjectMapper objectMapper;
    @Autowired
    private RelevanceFunc relevanceFunc;
    @Autowired
    private DimensionService dimensionService;
    /**
     *
@ -123,7 +121,7 @@ public class ElasticSearchDataProcessService {
                        List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode);
                        if(cubeMappingModels != null && cubeMappingModels.size() > 0){
                            for(CubeMappingModel cubeMappingModel :cubeMappingModels){
                                System.out.println("维度code = " + cubeMappingModel.getDimensionCode() +  ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue );
//                                System.out.println("维度code = " + cubeMappingModel.getDimensionCode() +  ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue );
                                Map<String, Object> source  = new HashMap<>();
                                if(cubeMappingModel.getParentId() == null){
                                    source.put(id_k,rowKey);
@ -156,46 +154,53 @@ public class ElasticSearchDataProcessService {
                }
            }else if (action.equals(action_putAll)) {
                if (dataMap.containsKey("cubeId")) {
                    //采用redis
                    Cube cube = cubeService.findOne(Integer.parseInt(cubeId));
                    if (null != cube) {
                        String index = cube.getIndexName();
                        String type = cube.getIndexType();
                        Map<String, Object> source  = new HashMap<>();
                        source.put(id_k,rowKey);
                        source.put(rowKey_k,rowKey);
                        for(String baseCloumnCode : dataMap.keySet()){
                            System.out.println("列:" + baseCloumnCode );
                            if(dataMap.get(baseCloumnCode)!= null){
                                baseCloumnValue = dataMap.get(baseCloumnCode).toString();
                                List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode);
                                if(cubeMappingModels != null && cubeMappingModels.size() > 0){
                                    for(CubeMappingModel cubeMappingModel :cubeMappingModels){
                                        System.out.println("维度code = " + cubeMappingModel.getDimensionCode() +  ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue );
                                        if(cubeMappingModel.getParentId() == null){
                                            source.putAll(dimensionDataExtendToMap(cubeMappingModel, cubeMappingModel.getDimensionCode(), baseCloumnValue));
                                        }else { // 子集成员
                                            CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId());
                                            String parentCode = mappingModel.getDimensionCode();
                                            cubeMappingModel.setParentCode(parentCode);
                                            cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType());
                                            if(mappingModel != null){
                                                Map<String,Object> childMap = new HashMap<>();
                                                childMap = dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId);
                                                if(source.get(parentCode) != null ){
                                                    Map<String, Object> parentMap  = (Map<String, Object>) source.get(parentCode);
                                                    parentMap.putAll((Map<String, Object>)childMap.get(parentCode));
                                                    source.put(parentCode,parentMap);
                                                }else {
                                                    source.put(parentCode,childMap.get(parentCode));
                        List<Map<String,Object>> dataList = (List<Map<String,Object>>)dataMap.get("dataList");
                        List<Map<String,Object>> sourcesList = new ArrayList<>();
                        for(Map<String,Object> oneDataMap : dataList){
                            Map<String, Object> source  = new HashMap<>();
                            for(String baseCloumnCode : oneDataMap.keySet()){
                                source.put(id_k, oneDataMap.get(rowKey_k).toString());
                                source.put(rowKey_k,oneDataMap.get(rowKey_k).toString());
//                                System.out.println("列:" + baseCloumnCode );
                                if(oneDataMap.get(baseCloumnCode)!= null){
                                    baseCloumnValue = oneDataMap.get(baseCloumnCode).toString();
                                    //这个 可以采用保存到redis 方式减少数据库压力
                                    List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode);
                                    if(cubeMappingModels != null && cubeMappingModels.size() > 0){
                                        for(CubeMappingModel cubeMappingModel :cubeMappingModels){
//                                            System.out.println("维度code = " + cubeMappingModel.getDimensionCode() +  ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue );
                                            if(cubeMappingModel.getParentId() == null){
                                                source.putAll(dimensionDataExtendToMap(cubeMappingModel, cubeMappingModel.getDimensionCode(), baseCloumnValue));
                                            }else { // 子集成员
                                                //这个 可以采用保存到redis 方式减少数据库压力
                                                CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId());
                                                String parentCode = mappingModel.getDimensionCode();
                                                cubeMappingModel.setParentCode(parentCode);
                                                cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType());
                                                if(mappingModel != null){
                                                    Map<String,Object> childMap = new HashMap<>();
                                                    childMap = dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId);
                                                    if(source.get(parentCode) != null ){
                                                        Map<String, Object> parentMap  = (Map<String, Object>) source.get(parentCode);
                                                        parentMap.putAll((Map<String, Object>)childMap.get(parentCode));
                                                        source.put(parentCode,parentMap);
                                                    }else {
                                                        source.put(parentCode,childMap.get(parentCode));
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                            sourcesList.add(source);
                        }
                        saveElasticSearchData(index, type,profileId,source);
                        elasticSearchUtil.bulkIndex(index,type,sourcesList);
                    } else {
                        throw new  Exception("索引不存在");
                    }
@ -464,7 +469,6 @@ public class ElasticSearchDataProcessService {
        }
    }
    /**
     * 数据类型转换
     * @param dataType

+ 66 - 25
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -16,6 +16,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -200,33 +201,73 @@ public class SingleTableJob implements Job {
            logger.warn("未获取到数据");
            return;
        }
        list.forEach(item -> {
            Map<String, Object> dataMap = new HashMap<>(item.size());
            dataMap.put("database", database);
            dataMap.put("dataSource", "mysql");
            dataMap.put("action", "Put");
            dataMap.put("table", table);
            item.forEach((key, value) -> {
                if (key.equals(primeKey)) {
                    dataMap.put("rowkey", value);
                }
                dataMap.put(key, value);
            });
            try {
                String jsonData = objectMapper.writeValueAsString(dataMap);
                Thread.sleep(50);
                logger.info("清除消息:{}",jsonData);
                boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
                if( !sendFlag ){
                    return;
        Map<String, Object> dataMap = new HashMap<>();
        dataMap.put("database", database);
        dataMap.put("dataSource", "mysql");
        dataMap.put("action", "PutAll");
        dataMap.put("table", table);
        dataMap.put("cubeId", cubeId);
        List<Map<String,Object>> dataList = new ArrayList<>();
        int p = 1;
        for(int i = 0; i < list.size() ; i++){
            if(( i - 100*p ) == 0){
                p++;
                try {
                    dataMap.put("dataList", dataList);
                    String jsonData = objectMapper.writeValueAsString(dataMap);
                    Thread.sleep(50);
                    logger.info("清除消息:{}",jsonData);
                    boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
                    if( !sendFlag ){
                        return;
                    }
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }else {
                Map<String,Object> map = new HashMap<>();
                Map<String,Object> item = list.get(i);
                item.forEach((key, value) -> {
                    if (key.equals(primeKey)) {
                        map.put("rowkey", value);
                    }
                    map.put(key, value);
                });
                dataList.add(map);
            }
        });
        }
//        list.forEach(item -> {
//
//            Map<String, Object> dataMap = new HashMap<>(item.size());
//            dataMap.put("database", database);
//            dataMap.put("dataSource", "mysql");
//            dataMap.put("action", "Put");
//            dataMap.put("table", table);
//            item.forEach((key, value) -> {
//                if (key.equals(primeKey)) {
//                    dataMap.put("rowkey", value);
//                }
//                dataMap.put(key, value);
//            });
//            try {
//                String jsonData = objectMapper.writeValueAsString(dataMap);
//                Thread.sleep(50);
//                logger.info("清除消息:{}",jsonData);
//                boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
//                if( !sendFlag ){
//                    return;
//                }
//            } catch (JsonProcessingException e) {
//                e.printStackTrace();
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        });
    }