Browse Source

实现单表Job数据提取逻辑

Airhead 6 years ago
parent
commit
1aaa1b9135

+ 1 - 0
src/main/java/com/yihu/quota/dao/dict/SystemDictDao.java

@ -7,5 +7,6 @@ import org.springframework.data.repository.PagingAndSortingRepository;
/**
 * Created by chenweida on 2017/6/1.
 */
@Deprecated
public interface SystemDictDao extends PagingAndSortingRepository<SystemDict, Long>, JpaSpecificationExecutor<SystemDict> {
}

+ 1 - 0
src/main/java/com/yihu/quota/dao/dict/SystemDictListDao.java

@ -7,5 +7,6 @@ import org.springframework.data.repository.PagingAndSortingRepository;
/**
 * Created by chenweida on 2017/6/1.
 */
@Deprecated
public interface SystemDictListDao extends PagingAndSortingRepository<SystemDictList, Long>, JpaSpecificationExecutor<SystemDictList> {
}

+ 64 - 0
src/main/java/com/yihu/quota/kafka/Producer.java

@ -0,0 +1,64 @@
package com.yihu.quota.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml;
import java.io.FileInputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
/**
 * @author janseny
 * @date 2018/9/14
 */
@Component
public class Producer {
    private static String topic = "sep-hbase-data";
    Logger logger = LoggerFactory.getLogger(Producer.class);
    public ProducerFactory<String, String> producerFactory() {
        String kafkaBrokerAddress = "";
        try {
            Yaml yaml = new Yaml();
            URL url = Producer.class.getClassLoader().getResource("application.yml");
            if (url != null) {
                Map map = (Map) yaml.load(new FileInputStream(url.getFile()));
                Map map2 = (Map) map.get("kafka");
                Map map3 = (Map) map2.get("broker");
                kafkaBrokerAddress = map3.get("address").toString();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    public String sendMessage(String message) {
        try {
            KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
            logger.info("kafka的消息={}", message);
            kafkaTemplate.send(topic, message);
            logger.info("send Message success.");
            return "SUCCESS";
        } catch (Exception e) {
            logger.error("send Message fail.", e);
            return "FAIL";
        }
    }
}

+ 1 - 1
src/main/java/com/yihu/quota/scheduler/special/OutPatientCostScheduler.java

@ -8,7 +8,7 @@ import com.yihu.ehr.profile.core.ResourceCore;
import com.yihu.ehr.solr.SolrUtil;
import com.yihu.ehr.util.datetime.DateUtil;
import com.yihu.ehr.util.rest.Envelop;
import com.yihu.quota.service.scheduler.HealthArchiveSchedulerService;
import com.yihu.quota.service.special.scheduler.HealthArchiveSchedulerService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;

+ 1 - 0
src/main/java/com/yihu/quota/service/dimension/TjDimensionMainService.java

@ -13,6 +13,7 @@ import java.util.List;
 * @date 2017/6/1
 */
@Service
@Deprecated
public class TjDimensionMainService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

+ 1 - 0
src/main/java/com/yihu/quota/service/dimension/TjDimensionSlaveService.java

@ -13,6 +13,7 @@ import java.util.List;
 * @date 2017/6/1
 */
@Service
@Deprecated
public class TjDimensionSlaveService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

+ 135 - 4
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -1,13 +1,29 @@
package com.yihu.quota.service.job;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import com.google.gson.Gson;
import com.yihu.quota.contants.JobConstant;
import com.yihu.quota.kafka.Producer;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * 用于一个表就是一个多维数据集的情况,如组织机构表的数据采集
 * 数据采集表要求,不符合要求的表需要先改造后进行数据采集:
 * <p>
 * 表必须是单字段唯一键(或主键),不支持复合唯一键(或主键)
 * 过滤字段只支持单字段,不支持多字段过滤
 * 过滤字段只支持时间和数字字段,不支持其他类型字段
 *
 * @author l4qiang
 * @date 2018-09-18
 */
@ -15,8 +31,123 @@ import org.springframework.stereotype.Component;
@Scope("prototype")
@DisallowConcurrentExecution
public class SingleTableJob implements Job {
    static private Logger logger = LoggerFactory.getLogger(SingleTableJob.class);
    /**
     * 数据来源表
     */
    protected String table;
    /**
     * 表主键
     */
    protected String primeKey;
    /**
     * 过滤字段
     */
    protected String filterField;
    /**
     * 过滤字段类型
     */
    protected String filterFieldType;
    /**
     * 过滤数据步长
     */
    protected String size;
    /**
     * 开始时间
     */
    protected String start;
    /**
     * 结束时间
     */
    protected String end;
    /**
     * 执行动作 1 手动执行 2 周期执行
     */
    protected JobConstant.ExecType execType;
    @Autowired
    private Producer producer;
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        prepare(jobExecutionContext);
        cleanData();
        List<Map<String, Object>> list;
        do {
            list = fetch();
            saveData(list);
        } while (list != null && list.size() != 0);
    }
    private void prepare(JobExecutionContext jobExecutionContext) {
        //spring注入
        SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        table = jobDataMap.getString("table");
        primeKey = jobDataMap.getString("primeKey");
        filterField = jobDataMap.getString("filterField");
        filterFieldType = jobDataMap.getString("filterFieldType");
        size = jobDataMap.getString("size");
        start = jobDataMap.getString("start");
        end = jobDataMap.getString("end");
    }
    private void cleanData() {
        if (JobConstant.ExecType.Full.equals(execType)) {
            Map<String, Object> dataMap = new HashMap<>(2);
            dataMap.put("table", table);
            dataMap.put("delAll", true);
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            producer.sendMessage(jsonData);
        }
    }
    private void saveData(List<Map<String, Object>> list) {
        if (list == null) {
            logger.warn("未获取到数据");
            return;
        }
        list.forEach(item -> {
            Map<String, Object> dataMap = new HashMap<>(item.size());
            dataMap.put("table", table);
            item.forEach((key, value) -> {
                if (key.equals(primeKey)) {
                    dataMap.put("rowKey", value);
                }
                dataMap.put(key, value);
            });
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            producer.sendMessage(jsonData);
        });
    }
    private List<Map<String, Object>> fetch() {
        //TODO:filterFieldType过滤
        String sql = "select * from " + table +
                " where " + filterField + ">=" + start + " and " + filterField + "<" + (start + size) + " and " +
                filterField + "<=" + end;
        return jdbcTemplate.queryForList(sql);
    }
}

+ 1 - 0
src/main/java/com/yihu/quota/service/org/OrgService.java

@ -12,6 +12,7 @@ import java.util.Map;
 * Created by progr1mmer on 2017/12/29.
 */
@Service
@Deprecated
public class OrgService {
    @Autowired

+ 1 - 0
src/main/java/com/yihu/quota/service/quota/special/DeviceService.java

@ -14,6 +14,7 @@ import javax.transaction.Transactional;
 */
@Service
@Transactional
@Deprecated
public class DeviceService {
    @Autowired

+ 1 - 0
src/main/java/com/yihu/quota/service/quota/special/HbaseService.java

@ -21,6 +21,7 @@ import java.util.Map;
 * @author jansney
 */
@Service
@Deprecated
public class HbaseService {
    @Autowired
    HBaseDao hbaseDao;

+ 1 - 0
src/main/java/com/yihu/quota/service/quota/special/SolrStatistsService.java

@ -19,6 +19,7 @@ import java.util.Map;
 * Created by janseny on 2017/7/7.
 */
@Service
@Deprecated
public class SolrStatistsService {
    private static String core = "HealthProfile";

+ 1 - 0
src/main/java/com/yihu/quota/service/quota/special/StatisticsService.java

@ -20,6 +20,7 @@ import java.util.*;
 * Created by lyr on 2016/7/21.
 */
@Service
@Deprecated
public class StatisticsService {
    private static String core = "HealthProfile";

+ 2 - 1
src/main/java/com/yihu/quota/service/scheduler/HealthArchiveSchedulerService.java

@ -1,4 +1,4 @@
package com.yihu.quota.service.scheduler;
package com.yihu.quota.service.special.scheduler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
@ -25,6 +25,7 @@ import java.util.*;
 * Created by wxw on 2018/3/14.
 */
@Service
@Deprecated
public class HealthArchiveSchedulerService {
    @Autowired