Procházet zdrojové kódy

优化从hbase中查询数据,解决OOM bug

LiTaohong před 7 roky
rodič
revize
571fb7a363

+ 9 - 0
common/common-entity/src/main/java/com/yihu/figure_label/entity/FlJobConfig.java

@ -26,6 +26,7 @@ public class FlJobConfig extends IdEntity {
    private String sourceType;
    private String source;
    private String datasource;
    private String extractField;
    public String getJobName() {
        return jobName;
@ -122,4 +123,12 @@ public class FlJobConfig extends IdEntity {
    public void setDatasource(String datasource) {
        this.datasource = datasource;
    }
    public String getExtractField() {
        return extractField;
    }
    public void setExtractField(String extractField) {
        this.extractField = extractField;
    }
}

+ 10 - 1
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/convert/EhrHealthProblemConvert.java

@ -32,7 +32,16 @@ public class EhrHealthProblemConvert implements Convert{
        hbaseDatas.forEach(
                hbaseMap -> {
                    SaveModel saveModel = new SaveModel();
                    saveModel.setIdcard(String.valueOf(hbaseMap.get("EHR_000017")));
                    Object idcard = null;
                    // ehr那边 EHR_000017 demographic_id 为身份证编码,优先选择EHR_000017,demographic_id作为补录
                    if(null != hbaseMap.get("EHR_000017")){
                        idcard = hbaseMap.get("EHR_000017");
                    }else if(null != hbaseMap.get("demographic_id")){
                        idcard = hbaseMap.get("demographic_id");
                    }else{
                        return;
                    }
                    saveModel.setIdcard(String.valueOf(idcard));
                    saveModel.setDictCode(one.getDictCode());
                    Object labelCode = hbaseMap.get("health_problem");
                    if(null == labelCode){

+ 10 - 1
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/convert/EhrICD10CMConvert.java

@ -32,7 +32,16 @@ public class EhrICD10CMConvert implements Convert{
        hbaseDatas.forEach(
                hbaseMap -> {
                    SaveModel saveModel = new SaveModel();
                    saveModel.setIdcard(String.valueOf(hbaseMap.get("EHR_000017")));
                    Object idcard = null;
                    // ehr那边 EHR_000017 demographic_id 为身份证编码,优先选择EHR_000017,demographic_id作为补录
                    if(null != hbaseMap.get("EHR_000017")){
                        idcard = hbaseMap.get("EHR_000017");
                    }else if(null != hbaseMap.get("demographic_id")){
                        idcard = hbaseMap.get("demographic_id");
                    }else{
                        return;
                    }
                    saveModel.setIdcard(String.valueOf(idcard));
                    saveModel.setDictCode(one.getDictCode());
                    Object labelCode = hbaseMap.get("diagnosis");
                    if(null == labelCode){

+ 57 - 37
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/extract/HbaseExtracter.java

@ -4,15 +4,19 @@ import com.yihu.base.SolrHelper;
import com.yihu.base.hbase.HBaseHelper;
import com.yihu.wlyy.figure.label.model.DataModel;
import com.yihu.wlyy.figure.label.util.TimeUtil;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.solr.common.SolrDocumentList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -41,18 +45,48 @@ public class HbaseExtracter implements Extracter{
        return null;
    }
    public  List<Map<String,Object>>  extractData(String core, String q, String fq) {
        List<Map<String,Object>> result = new ArrayList<>();
    public  List<Map<String,Object>>  extractData(String core, String q, String fq,String extractColumn) {
        List<Map<String,Object>> resultList = new ArrayList<>();
        String[] arr = core.split(";");
        String solrCore = arr[0];
        String table = arr[1];
        //组装提取的列
        String basicFl = "";
        String dFl = "";
        String[] columnArr = extractColumn.split(";");
        if(null != columnArr){
            for(int i = 0;i < columnArr.length; i++){
                if (columnArr[i].contains("basic=")) {
                    basicFl = columnArr[i].split("=")[1];
                } else if (columnArr[i].contains("d=")) {
                    dFl = columnArr[i].split("=")[1];
                }
            }
        }
        List<String> rowkeys = new ArrayList<>();
        rowkeys = getRowkeys(solrCore,q,fq);
        for(String rowkey : rowkeys){
           Map<String,Object> map = hBaseHelper.getResultMap(table,rowkey);
            result.add(map);
        long start = System.currentTimeMillis();
        String logTitle = "get data from hbase";
        TimeUtil.start(logger, logTitle, start);
        Result[] resultArr = hBaseHelper.getResultList(table,rowkeys,basicFl,dFl);
        for(int i = 0; i < resultArr.length; i++){
            List<Cell> ceList = resultArr[i].listCells();
            if(CollectionUtils.isEmpty(ceList)){
                continue;
            }
            Map<String, Object> map = new HashMap();
            map.put("rowkey",rowkeys.get(i));
            Iterator var5 = ceList.iterator();
            while(var5.hasNext()) {
                Cell cell = (Cell)var5.next();
                map.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
            }
            resultList.add(map);
        }
        return result;
        TimeUtil.finish(logger, logTitle, start, System.currentTimeMillis());
        logger.info("hbase data counts:" + resultList.size());
        return resultList;
    }
    public List<String> getRowkeys(String core, String q, String fq) {
@ -66,46 +100,32 @@ public class HbaseExtracter implements Extracter{
            e.printStackTrace();
            logger.error("get solr query error");
        }
       /* int number = (int) count / numPerPage + 1;
        List<String> rowkeys = new ArrayList<>();
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(number);
        List<Callable<List<String>>> threadList = new ArrayList<>();
        for (int i = 0; i < number; i++) {
            MutilThreadSearchSolrIndexExtracter searchSolrIndexExtracter = new MutilThreadSearchSolrIndexExtracter(solrHelper, core, q, fq, i * numPerPage, numPerPage);
            threadList.add(searchSolrIndexExtracter);
        }
        long start = System.currentTimeMillis();
        String logTitle = "therad get data";
        TimeUtil.start(logger, logTitle, start);
        try {
            List<Future<List<String>>> futureList = fixedThreadPool.invokeAll(threadList);
            //取回线程执行的结果
            for (Future future : futureList) {
                rowkeys.addAll((List<String>) future.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("get futureList result error");
        }
        long finish = System.currentTimeMillis();
        TimeUtil.finish(logger, logTitle, start, finish);
        return rowkeys;*/
        return rowkeyList;
    }
    /**
     * 从solr中获取rowkey
     * @param core
     * @param q
     * @param fq
     * @param start
     * @param rows
     * @return
     */
    public List<String> getRowkeysFromSolr(String core,String q,String[] fq,long start,long rows){
        List<String> rowkeyList = new ArrayList<>();
        SolrDocumentList solrDocumentList = null;
        String logTitle = "get data from solr";
        TimeUtil.start(logger, logTitle, start);
        String logTitle = "get rowkeys from solr";
        long startTime = System.currentTimeMillis();
        TimeUtil.start(logger, logTitle, startTime);
        try {
            solrDocumentList = solrHelper.queryfl(core,q,fq,null,"rowkey",start,rows);
        } catch (Exception e) {
            e.printStackTrace();
        }
        long finish = System.currentTimeMillis();
        TimeUtil.finish(logger, logTitle, start, finish);
        logger.info("job get data counts:" + solrDocumentList.size());
        TimeUtil.finish(logger, logTitle, startTime, finish);
        logger.info("rowkeys counts:" + solrDocumentList.size());
        solrDocumentList.forEach(
                document -> {
                    rowkeyList.add(document.get("rowkey").toString());

+ 18 - 2
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/job/Hbase2ESJob.java

@ -13,6 +13,7 @@ import com.yihu.wlyy.figure.label.model.DataModel;
import com.yihu.wlyy.figure.label.model.SaveModel;
import com.yihu.wlyy.figure.label.service.JobService;
import com.yihu.wlyy.figure.label.storage.Store2ES;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,6 +75,8 @@ public class Hbase2ESJob implements Job {
    private String fq;
    private String extractColumn;
    /**
     * 数据表的id,有些数据是按时间增量查询,有些数据是按表的主键id增量查询
     */
@ -122,8 +125,10 @@ public class Hbase2ESJob implements Job {
        }
        //1抽取数据
        this.q = this.flJobConfig.getSql();
        fq = this.flJobConfig.getSqlField() + ":" + "[* TO " + this.sqlFiledValue + "]";
        fq = this.flJobConfig.getSqlField() + ":" + "[" + toSolrTime(this.sqlFiledValue) +" TO *]";
        //要从hbase中查询的列
        this.extractColumn = this.flJobConfig.getExtractField();
    }
    /**
@ -131,7 +136,7 @@ public class Hbase2ESJob implements Job {
     */
    public void extract(){
        this.datas = hbaseExtracter.extractData(this.datasource,this.q,this.fq);
        this.datas = hbaseExtracter.extractData(this.datasource,this.q,this.fq,this.extractColumn);
    }
    /**
@ -164,5 +169,16 @@ public class Hbase2ESJob implements Job {
        return bool;
    }
    /**
     * 转换为Solr时间格式
     * @param datetime
     * @return
     */
    public String toSolrTime(String datetime){
        String solrTime = "";
        String[] timeArr = datetime.split(" ");
        solrTime = timeArr[0] + "T" + timeArr[1]+"Z";
        return solrTime;
    }
}