瀏覽代碼

全量导入数据时一次解析多条数据 增加解析效率

jkzlzhoujie 6 年之前
父節點
當前提交
cc19aac9df

+ 0 - 1
src/main/java/com/yihu/quota/service/cube/ElasticSearchDataProcessService.java

@ -505,7 +505,6 @@ public class ElasticSearchDataProcessService {
                    }else {
                        dateValue = DateUtil.parseDate(keyValue, DateUtil.DEFAULT_DATE_YMD_FORMAT);
                    }
                    System.out.println("keyval=" + keyValue + ",dateVal=" + dateValue);
                    //es 保存是少8小时
                    Calendar ca = Calendar.getInstance();
                    ca.setTime(dateValue);

+ 40 - 53
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -209,66 +209,53 @@ public class SingleTableJob implements Job {
        dataMap.put("cubeId", cubeId);
        List<Map<String,Object>> dataList = new ArrayList<>();
        int p = 1;
        int perCount = 200;
        int d = list.size()/perCount;
        int y = list.size()%perCount;
        for(int i = 0; i < list.size() ; i++){
            if(( i - 100*p ) == 0){
                p++;
                try {
                    dataMap.put("dataList", dataList);
                    String jsonData = objectMapper.writeValueAsString(dataMap);
                    Thread.sleep(50);
                    logger.info("清除消息:{}",jsonData);
                    boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
                    if( !sendFlag ){
                        return;
                    }
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
            Map<String,Object> map = new HashMap<>();
            Map<String,Object> item = list.get(i);
            item.forEach((key, value) -> {
                if (key.equals(primeKey)) {
                    map.put("rowkey", value);
                }
                map.put(key, value);
            });
            dataList.add(map);
            if(list.size() < perCount){
                dataMap.put("dataList", dataList);
                sendDataMessage(dataMap);
                dataList.clear();
            }else {
                Map<String,Object> map = new HashMap<>();
                Map<String,Object> item = list.get(i);
                item.forEach((key, value) -> {
                    if (key.equals(primeKey)) {
                        map.put("rowkey", value);
                if((i+1) == perCount*p){
                    p++;
                    dataMap.put("dataList", dataList);
                    sendDataMessage(dataMap);
                    dataList.clear();
                }else{
                    //有余数时,最后一组数据
                    if(d > 0 && y > 0 && i==list.size()-1){
                        dataMap.put("dataList", dataList);
                        sendDataMessage(dataMap);
                        dataList.clear();
                    }
                    map.put(key, value);
                });
                dataList.add(map);
                }
            }
        }
    }
//        list.forEach(item -> {
//
//            Map<String, Object> dataMap = new HashMap<>(item.size());
//            dataMap.put("database", database);
//            dataMap.put("dataSource", "mysql");
//            dataMap.put("action", "Put");
//            dataMap.put("table", table);
//            item.forEach((key, value) -> {
//                if (key.equals(primeKey)) {
//                    dataMap.put("rowkey", value);
//                }
//                dataMap.put(key, value);
//            });
//            try {
//                String jsonData = objectMapper.writeValueAsString(dataMap);
//                Thread.sleep(50);
//                logger.info("清除消息:{}",jsonData);
//                boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
//                if( !sendFlag ){
//                    return;
//                }
//            } catch (JsonProcessingException e) {
//                e.printStackTrace();
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        });
    private boolean sendDataMessage(Map<String,Object> dataMap){
        try {
            String jsonData = objectMapper.writeValueAsString(dataMap);
            Thread.sleep(50);
//            logger.info("消息:{}",jsonData);
            return producer.sendMessage(Producer.sepTopic, jsonData);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }
    private List<Map<String, Object>> fetch(Integer start, Integer row) {