Browse Source

数据解析 增加 数据关联查询

jkzlzhoujie 6 years ago
parent
commit
9937679c46

+ 8 - 1
src/main/java/com/yihu/quota/service/cube/ElasticSearchDataProcessService.java

@ -133,6 +133,10 @@ public class ElasticSearchDataProcessService {
                                    rowKey = profileId;
                                    source.put(id_k,profileId);
                                    source.put(rowKey_k,profileId);
                                    CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId());
                                    String parentCode = mappingModel.getDimensionCode();
                                    cubeMappingModel.setParentCode(parentCode);
                                    cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType());
                                    source.putAll(dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId));
                                }
                                String index = cubeMappingModel.getIndexName();
@ -262,7 +266,10 @@ public class ElasticSearchDataProcessService {
    public Map<String,Object> dimensionMemberDataExtendToMap(CubeMappingModel cubeMappingModel,String baseCloumnValue,String subRowKey,String profileId) throws Exception {
        try {
            int parentId = cubeMappingModel.getParentId();
            int childSaveType = cubeMappingModel.getChildSaveType();
            int childSaveType = 1;
            if(cubeMappingModel.getChildSaveType()  != null ){
                childSaveType = cubeMappingModel.getChildSaveType();
            }
            String parentCode = cubeMappingModel.getParentCode();
            Map<String, Object> source  = new HashMap<>();

+ 20 - 11
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -1,5 +1,7 @@
package com.yihu.quota.service.job;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.yihu.quota.contants.JobConstant;
import com.yihu.quota.kafka.Producer;
@ -93,6 +95,8 @@ public class SingleTableJob implements Job {
    private Producer producer;
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private ObjectMapper objectMapper;
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
@ -150,7 +154,11 @@ public class SingleTableJob implements Job {
        size = jobDataMap.getString("size");
        start = jobDataMap.getString("start");
        end = jobDataMap.getString("end");
        execType = JobConstant.ExecType.fromInt(jobDataMap.getIntValue("execType"));
        if(jobDataMap.get("execType") != null){
            execType = JobConstant.ExecType.fromInt(jobDataMap.getIntValue("execType"));
        }else {
            execType = JobConstant.ExecType.Full;
        }
        searchColumn = jobDataMap.getString("searchColumn");
        cubeId = jobDataMap.getString("cubeId");
    }
@ -164,10 +172,13 @@ public class SingleTableJob implements Job {
            dataMap.put("action", "DelAll");
            dataMap.put("cubeId", cubeId);
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            logger.info("清除消息:{}",jsonData);
            producer.sendMessage(Producer.sepTopic, jsonData);
            try {
                String jsonData = objectMapper.writeValueAsString(dataMap);
                logger.info("清除消息:{}",jsonData);
                producer.sendMessage(Producer.sepTopic, jsonData);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    }
@ -187,17 +198,15 @@ public class SingleTableJob implements Job {
                if (key.equals(primeKey)) {
                    dataMap.put("rowkey", value);
                }
                dataMap.put(key, value);
            });
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            logger.info("保存消息:{}",jsonData);
            try {
                String jsonData = objectMapper.writeValueAsString(dataMap);
                Thread.sleep(50);
                logger.info("清除消息:{}",jsonData);
                producer.sendMessage(Producer.sepTopic, jsonData);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }