Browse Source

全量导入数据时一次解析多条数据 增加解析效率

jkzlzhoujie 6 years ago
parent
commit
4538ccd043
1 changed files with 66 additions and 25 deletions
  1. 66 25
      src/main/java/com/yihu/quota/service/job/SingleTableJob.java

+ 66 - 25
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -16,6 +16,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -200,33 +201,73 @@ public class SingleTableJob implements Job {
            logger.warn("未获取到数据");
            return;
        }
        list.forEach(item -> {
            Map<String, Object> dataMap = new HashMap<>(item.size());
            dataMap.put("database", database);
            dataMap.put("dataSource", "mysql");
            dataMap.put("action", "Put");
            dataMap.put("table", table);
            item.forEach((key, value) -> {
                if (key.equals(primeKey)) {
                    dataMap.put("rowkey", value);
                }
                dataMap.put(key, value);
            });
            try {
                String jsonData = objectMapper.writeValueAsString(dataMap);
                Thread.sleep(50);
                logger.info("清除消息:{}",jsonData);
                boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
                if( !sendFlag ){
                    return;
        Map<String, Object> dataMap = new HashMap<>();
        dataMap.put("database", database);
        dataMap.put("dataSource", "mysql");
        dataMap.put("action", "PutAll");
        dataMap.put("table", table);
        dataMap.put("cubeId", cubeId);
        List<Map<String,Object>> dataList = new ArrayList<>();
        int p = 1;
        for(int i = 0; i < list.size() ; i++){
            if(( i - 100*p ) == 0){
                p++;
                try {
                    dataMap.put("dataList", dataList);
                    String jsonData = objectMapper.writeValueAsString(dataMap);
                    Thread.sleep(50);
                    logger.info("清除消息:{}",jsonData);
                    boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
                    if( !sendFlag ){
                        return;
                    }
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }else {
                Map<String,Object> map = new HashMap<>();
                Map<String,Object> item = list.get(i);
                item.forEach((key, value) -> {
                    if (key.equals(primeKey)) {
                        map.put("rowkey", value);
                    }
                    map.put(key, value);
                });
                dataList.add(map);
            }
        });
        }
//        list.forEach(item -> {
//
//            Map<String, Object> dataMap = new HashMap<>(item.size());
//            dataMap.put("database", database);
//            dataMap.put("dataSource", "mysql");
//            dataMap.put("action", "Put");
//            dataMap.put("table", table);
//            item.forEach((key, value) -> {
//                if (key.equals(primeKey)) {
//                    dataMap.put("rowkey", value);
//                }
//                dataMap.put(key, value);
//            });
//            try {
//                String jsonData = objectMapper.writeValueAsString(dataMap);
//                Thread.sleep(50);
//                logger.info("清除消息:{}",jsonData);
//                boolean sendFlag = producer.sendMessage(Producer.sepTopic, jsonData);
//                if( !sendFlag ){
//                    return;
//                }
//            } catch (JsonProcessingException e) {
//                e.printStackTrace();
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        });
    }