Bläddra i källkod

1.实现Es2EsJob功能;
2.添加ES数据抽取器、是否抽烟转换器。

humingfen 7 år sedan
förälder
incheckning
1d9740faf0

+ 6 - 6
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/convert/ConvertHelper.java

@ -105,24 +105,24 @@ public class ConvertHelper {
     * @return
     */
    public void generateDataSource(DataModel dataModel,SaveModel saveModel,String sourceType,String sources) {
        if(!StringUtils.endsWithIgnoreCase(SourceTypeEnum.MYSQL.toString(),sourceType)){
        if(!StringUtils.endsWithIgnoreCase(SourceTypeEnum.MYSQL.toString(),sourceType) && !StringUtils.endsWithIgnoreCase(SourceTypeEnum.ELASTICSEARCH.toString(),sourceType) ){
            return;
        }
        StringBuilder ids = new StringBuilder();
        // 默认值为0
        if(dataModel.getId() != 0){
        if(dataModel.getId() != null && dataModel.getId() != 0 ){
            ids.append(dataModel.getId()).append(",");
        }
        if(dataModel.getId1() != 0){
        if(dataModel.getId1() != null && dataModel.getId1() != 0 ){
            ids.append(dataModel.getId1()).append(",");
        }
        if(dataModel.getId2() != 0){
        if(dataModel.getId2() != null && dataModel.getId2() != 0 ){
            ids.append(dataModel.getId2()).append(",");
        }
        if(dataModel.getId3() != 0){
        if(dataModel.getId3() != null && dataModel.getId3() != 0 ){
            ids.append(dataModel.getId3()).append(",");
        }
        if(dataModel.getId4() != 0){
        if(dataModel.getId4() != null && dataModel.getId4() != 0 ){
            ids.append(dataModel.getId4());
        }
        String[] idArr = ids.toString().split(",");

+ 60 - 0
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/convert/IsSmokingConvert.java

@ -0,0 +1,60 @@
package com.yihu.wlyy.figure.label.convert;
import com.yihu.figure_label.entity.FlLabelDict;
import com.yihu.wlyy.figure.label.model.DataModel;
import com.yihu.wlyy.figure.label.model.SaveModel;
import com.yihu.wlyy.figure.label.util.ConstantUtil;
import com.yihu.wlyy.figure.label.util.MakeIDUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
/**
 * @author lith on 2018.03.14
 * 是否有抽烟标签转换器
 */
@Component
public class IsSmokingConvert implements Convert{
    @Autowired
    private ConvertHelper convertHelper;
    @Override
    public List<SaveModel> convert(List<DataModel> modelList, List<FlLabelDict> flLabelDictList, String sourceType, String source) {
        List<SaveModel> saveModels = new ArrayList<>();
        // 抽烟code为2,代表未知
        String hasSmoking = "2";
        Map<String,String> dictMap = new HashMap<>();
        flLabelDictList.forEach(
                one ->{
                    dictMap.put(one.getLabelCode(),one.getLabelName());
                }
        );
        for(DataModel dataModel:modelList){
            if(StringUtils.isNotBlank(dataModel.getLabelValue())) {
                int dailySmoking = Integer.parseInt(dataModel.getLabelValue());
                if(dailySmoking > 0){
                    hasSmoking = "0";
                }else if(dailySmoking <= 0) {
                    hasSmoking = "1";
                }
            }
            SaveModel saveModel = new SaveModel();
            FlLabelDict dict = flLabelDictList.get(Integer.parseInt(hasSmoking));
            saveModel.setId(MakeIDUtil.makeEsSaveModelID(dataModel,dict));
            saveModel.setLabelType(dict.getParentCode());
            saveModel.setLabelCode(hasSmoking);
            saveModel.setLabelName(dictMap.get(hasSmoking));
            convertHelper.generateDataSource(dataModel,saveModel,sourceType,source);
            //如果fl_job_config表配置的id和数据来源不一致,则不保存数据
            if (StringUtils.isEmpty(saveModel.getSource())) {
                return new ArrayList<>();
            }
            saveModel.setCreateTime(DateFormatUtils.format(new Date(), ConstantUtil.date_format));
            saveModels.add(saveModel);
        }
        return saveModels;
    }
}

+ 2 - 1
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/enums/SourceTypeEnum.java

@ -7,5 +7,6 @@ package com.yihu.wlyy.figure.label.enums;
public enum SourceTypeEnum {
    MYSQL,
    HBASE,
    FILE;
    FILE,
    ELASTICSEARCH;
}

+ 75 - 0
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/extract/ESExtracter.java

@ -0,0 +1,75 @@
package com.yihu.wlyy.figure.label.extract;
import com.yihu.base.es.config.ElasticsearchUtil;
import com.yihu.wlyy.figure.label.model.DataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * @author lith on 2018.03.14 -- Created by humingfen on 2018/4/16.
 * ES数据抽取器
 */
@Component
public class ESExtracter implements Extracter{
    private Logger logger = LoggerFactory.getLogger(ESExtracter.class);
    @Autowired
    private ElasticsearchUtil elasticsearchUtil;
    public List<DataModel> extractDataByJobConfigsql(String sql){
        List<DataModel> saveModels = new ArrayList<>();
        List<Map<String,Object>> list = elasticsearchUtil.excuteDataModel(sql);
        for(Map<String,Object> tempMap:list){
            try {
                DataModel dataModel = new DataModel();
                for (String s : tempMap.keySet()) {
                    String key = null;
                    Object value = tempMap.get(s);
                    if (s.startsWith("_")) {
                        continue;
                    }else if((s.equals("id") || s.equals("id1") || s.equals("id2")) && value != null) {
                        value = Integer.parseInt(String.valueOf(value));
                    }
                    key = "set" + UpFirstStr(s);
                    try {
                        if (value instanceof String) {
                            DataModel.class.getMethod(key, String.class).invoke(dataModel, value);
                        } else if (value instanceof Integer) {
                            DataModel.class.getMethod(key, Integer.class).invoke(dataModel, value);
                        } else if (value instanceof Double) {
                            DataModel.class.getMethod(key, Double.class).invoke(dataModel, value);
                        } else if (value instanceof java.util.Date) {
                            DataModel.class.getMethod(key, java.util.Date.class).invoke(dataModel, value);
                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage());
                    }
                }
                saveModels.add(dataModel);
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        return saveModels;
    }
    /**
     * 首字母大写
     *
     * @param str
     * @return
     */
    private String UpFirstStr(String str) {
        return str.replaceFirst(str.substring(0, 1), str.substring(0, 1).toUpperCase());
    }
}

+ 83 - 4
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/job/Es2EsJob.java

@ -1,18 +1,26 @@
package com.yihu.wlyy.figure.label.job;
import com.alibaba.druid.util.StringUtils;
import com.yihu.figure_label.entity.FlJobConfig;
import com.yihu.figure_label.entity.FlLabelDictJob;
import com.yihu.wlyy.figure.label.controller.JobController;
import com.yihu.wlyy.figure.label.convert.ConvertHelper;
import com.yihu.wlyy.figure.label.dao.FlJobConfigDao;
import com.yihu.wlyy.figure.label.dao.FlLabelDictJobDao;
import com.yihu.wlyy.figure.label.enums.JobSqlFieldTypeEnum;
import com.yihu.wlyy.figure.label.extract.ESExtracter;
import com.yihu.wlyy.figure.label.model.DataModel;
import com.yihu.wlyy.figure.label.model.SaveModel;
import com.yihu.wlyy.figure.label.service.JobService;
import com.yihu.wlyy.figure.label.storage.Store2ES;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
@Service
@ -25,6 +33,18 @@ public class Es2EsJob implements Job {
    @Autowired
    private FlJobConfigDao flJobConfigDao;
    @Autowired
    private FlLabelDictJobDao flLabelDictJobDao;
    @Autowired
    private ESExtracter esExtracter;
    @Autowired
    private ConvertHelper convertHelper;
    @Autowired
    Store2ES store2ES;
    @Autowired
    private JobService jobService;
@ -40,13 +60,36 @@ public class Es2EsJob implements Job {
    private List<DataModel> dataModelList;
    private List<FollowupModel> followuplList;
    private String sourceType;
    private String source;
    /**
     * 数据表的id,有些数据是按时间增量查询,有些数据是按表的主键id增量查询
     */
    private long lastDataId;
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDataMap paramsMap = context.getJobDetail().getJobDataMap();
        initParams(paramsMap);
        //提取数据
        extract();
        //数据转换
        List<SaveModel> list = new ArrayList<>();
        list = convert();
        //数据保存
        boolean bool = save(list);
        //增量存储成功后,修改增量的czrq时间为当前时间或主键id为上次获取到的数据的最后的一条的id
        if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.TIME.toString())){
            jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,null,bool);
        }else if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.NUM.toString())){
            int index = dataModelList.size();
            lastDataId = dataModelList.get(index -1).getId();
            jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,this.lastDataId,bool);
        }
    }
@ -56,18 +99,17 @@ public class Es2EsJob implements Job {
        this.source = String.valueOf(paramsMap.get("source"));
        this.flJobConfig = flJobConfigDao.findById(this.flJobConfigId);
        //this.sqlFiledValue = (String)paramsMap.get(this.flJobConfig.getSqlField().toString());
        this.sqlFiledValue = String.valueOf(paramsMap.get("sqlFiledValue"));
        this.sqlFiledCondition = paramsMap.getString("sqlFiledCondition");
        //没有传增量值,以数据库配置的默认值为查询条件
        if(StringUtils.isEmpty(this.sqlFiledValue)){
        if(org.springframework.util.StringUtils.isEmpty(this.sqlFiledValue)){
            this.sqlFiledValue = this.flJobConfig.getSqlFieldValue();
            this.sqlFiledCondition=">";
        }
        //1抽取数据
        String sql = this.flJobConfig.getSql();
        String sqlFiled = this.flJobConfig.getSqlField();
//      this.finalSql = getJobConfigSql(sql,sqlFiled,sqlFiledCondition,sqlFiledValue);
        this.finalSql = getFinalSql(sql,sqlFiled,sqlFiledCondition,sqlFiledValue);
    }
    public String getFinalSql(String sql, String sqlFiled, String sqlFiledCondition, String sqlFiledValue) {
@ -87,4 +129,41 @@ public class Es2EsJob implements Job {
        }
        return result.toString();
    }
    /**
     * 提取数据,按数据库中配置的增量条件提取
     */
    public void extract(){
        this.dataModelList = esExtracter.extractDataByJobConfigsql(this.finalSql);
    }
    /**
     * 转换
     */
    public List<SaveModel> convert(){
        List<SaveModel> list = new ArrayList<>();
        FlLabelDictJob flLabelDictJob = flLabelDictJobDao.findByJobId(this.flJobConfigId);
        try {
            list = convertHelper.convert(this.dataModelList, flLabelDictJob,this.sourceType,this.source);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
        return list;
    }
    /**
     * 保存
     * @param list
     */
    public boolean save(List<SaveModel> list){
        boolean bool = true;
        try {
            store2ES.save(list);
        }catch (Exception e){
            logger.error("save to elasticsearch failed,convet data count:" + list.size());
            bool = false;
        }
        return bool;
    }
}

+ 9 - 15
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/job/Mysql2ESJob.java

@ -87,15 +87,15 @@ public class Mysql2ESJob implements Job {
        //数据保存
        boolean bool = save(list);
            //增量存储成功后,修改增量的czrq时间为当前时间或主键id为上次获取到的数据的最后的一条的id
            if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.TIME.toString())){
                jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,null,bool);
            }else if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.NUM.toString())){
                int index = dataModelList.size();
                lastDataId = dataModelList.get(index -1).getId();
                jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,this.lastDataId,bool);
            }
        //增量存储成功后,修改增量的czrq时间为当前时间或主键id为上次获取到的数据的最后的一条的id
        if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.TIME.toString())){
            jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,null,bool);
        }else if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.NUM.toString())){
            int index = dataModelList.size();
            lastDataId = dataModelList.get(index -1).getId();
            jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,this.lastDataId,bool);
        }
    }
    public void initParams(JobDataMap paramsMap){
@ -104,19 +104,13 @@ public class Mysql2ESJob implements Job {
        this.source = String.valueOf(paramsMap.get("source"));
        this.flJobConfig = flJobConfigDao.findById(this.flJobConfigId);
        this.sqlFiledValue = this.flJobConfig.getSqlFieldValue();
        this.sqlFiledCondition = paramsMap.getString("sqlFiledCondition");
       /* //this.sqlFiledValue = (String)paramsMap.get(this.flJobConfig.getSqlField().toString());
        this.sqlFiledValue = String.valueOf(paramsMap.get("sqlFiledValue"));
//        this.sqlFiledValue = String.valueOf(paramsMap.get("sqlFiledValue"));
        this.sqlFiledCondition = paramsMap.getString("sqlFiledCondition");
        this.sqlFiledValue = flJobConfig.getSqlFieldValue();
        this.sqlFiledCondition = ">";
        //没有传增量值,以数据库配置的默认值为查询条件
        if(StringUtils.isEmpty(this.sqlFiledValue)){
            this.sqlFiledValue = this.flJobConfig.getSqlFieldValue();
            this.sqlFiledCondition=">";
        }*/
        }
        //1抽取数据
        String sql = this.flJobConfig.getSql();
        String sqlFiled = this.flJobConfig.getSqlField();

+ 15 - 15
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/model/DataModel.java

@ -8,11 +8,11 @@ public class DataModel {
    /**
     * 这些id用作记录数据来源,当有多个表时,记录多个表的id,并依据fl_job_config里的sourceTyep,source来区别
     */
    private int id;
    private int id1;
    private int id2;
    private int id3;
    private int id4;
    private Integer id;
    private Integer id1;
    private Integer id2;
    private Integer id3;
    private Integer id4;
    /**
     * 居民身份证号
     */
@ -56,43 +56,43 @@ public class DataModel {
     */
    private String status;
    public int getId() {
    public Integer getId() {
        return id;
    }
    public void setId(int id) {
    public void setId(Integer id) {
        this.id = id;
    }
    public int getId1() {
    public Integer getId1() {
        return id1;
    }
    public void setId1(int id1) {
    public void setId1(Integer id1) {
        this.id1 = id1;
    }
    public int getId2() {
    public Integer getId2() {
        return id2;
    }
    public void setId2(int id2) {
    public void setId2(Integer id2) {
        this.id2 = id2;
    }
    public int getId3() {
    public Integer getId3() {
        return id3;
    }
    public void setId3(int id3) {
    public void setId3(Integer id3) {
        this.id3 = id3;
    }
    public int getId4() {
    public Integer getId4() {
        return id4;
    }
    public void setId4(int id4) {
    public void setId4(Integer id4) {
        this.id4 = id4;
    }

+ 2 - 0
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/service/JobService.java

@ -127,6 +127,8 @@ public class JobService {
        Map<String, Object> params = new HashMap<>();
        params.put("jobConfig", flJobConfigVO.getId());
        params.put("sourceType", flJobConfigVO.getSourceType());
        params.put("source", flJobConfigVO.getSource());
        if(!StringUtils.isEmpty(flJobConfigVO.getSqlFieldValue())){
            params.put(flJobConfigVO.getSqlField(),flJobConfigVO.getSqlFieldValue());
        }else{

+ 20 - 2
patient-co/patient-co-figure-label/src/main/java/com/yihu/wlyy/figure/label/util/MakeIDUtil.java

@ -1,13 +1,14 @@
package com.yihu.wlyy.figure.label.util;
import com.yihu.figure_label.entity.FlLabelDict;
import com.yihu.wlyy.figure.label.model.DataModel;
import org.springframework.util.StringUtils;
public class MakeIDUtil {
    /**
     * 生成存入es的id,确保数据不会重复
     * id由 idcard,parentCode,labelName,labelCode MD5加密而成。不用es自动生成的_id,实现数据插入排重
     * 生成从mysql获取,后存入es的id,确保数据不会重复
     * id由 idcard,parentCode,labelName,labelCode MD5加密而成。
     * @param dataModel
     * @param labelValue
     * @return
@ -24,6 +25,23 @@ public class MakeIDUtil {
        return MD5Util.GetMD5Code(idStr.toString());
    }
    /**
     * 生成从es获取,后存入es的id,确保数据不会重复
     * id由 id,id1,parentCode,labelName,labelCode MD5加密而成。
     * @param dataModel
     * @param flLabelDict
     * @return
     */
    public static String makeEsSaveModelID(DataModel dataModel, FlLabelDict flLabelDict) {
        StringBuilder idStr = new StringBuilder();
        idStr.append(dataModel.getId() + "").append("-")
                .append(dataModel.getId1() + "").append("-")
                .append(flLabelDict.getParentCode()).append("-")
                .append(flLabelDict.getLabelCode()).append("-")
                .append(flLabelDict.getLabelName());
        return MD5Util.GetMD5Code(idStr.toString());
    }
    public static void main(String[] args) {
        DataModel dataModel = new DataModel();

+ 11 - 1
patient-co/patient-co-figure-label/src/main/resources/application-dev.yml

@ -13,9 +13,19 @@ spring:
      password: 123456
  data:
    elasticsearch:
      cluster-nodes-jest: http://172.19.103.45:9200,http://172.19.103.68:9200  #多个逗号分割
      cluster-name: jkzl #es集群的名字
      cluster-nodes: 172.19.103.68:9300
#      ,172.19.103.45:9300, #多个逗号分割
      cluster-nodes-jest: http://172.19.103.68:9200
#      ,http://172.19.103.45:9200  #多个逗号分割
      repositories:
        enabled: true
      properties:
        client:
          transport:
            sniff: false #开启嗅探集群  用nginx代理一层过后会出现ip解析失败问题
quartz:
  namespace: patient-co-figure-label ##quartz的命名空间,名称一样实现消费负载