فهرست منبع

Merge remote-tracking branch 'origin/dev-1.13.0' into dev-1.13.0

jkzlzhoujie 6 سال پیش
والد
کامیت
47ad78f6ac
28فایلهای تغییر یافته به همراه617 افزوده شده و 110 حذف شده
  1. 7 10
      src/main/java/com/yihu/quota/config/AsyncConfig.java
  2. 4 2
      src/main/java/com/yihu/quota/config/quota/JobFactory.java
  3. 19 14
      src/main/java/com/yihu/quota/config/quota/SchedulerConfig.java
  4. 119 0
      src/main/java/com/yihu/quota/contants/JobConstant.java
  5. 3 3
      src/main/java/com/yihu/quota/controller/JobController.java
  6. 1 0
      src/main/java/com/yihu/quota/dao/dict/SystemDictDao.java
  7. 1 0
      src/main/java/com/yihu/quota/dao/dict/SystemDictListDao.java
  8. 1 2
      src/main/java/com/yihu/quota/dao/dimension/TjDimensionSlaveDao.java
  9. 7 2
      src/main/java/com/yihu/quota/etl/formula/FormulaExecutor.java
  10. 71 0
      src/main/java/com/yihu/quota/kafka/Producer.java
  11. 1 1
      src/main/java/com/yihu/quota/scheduler/special/OutPatientCostScheduler.java
  12. 1 0
      src/main/java/com/yihu/quota/service/dimension/TjDimensionMainService.java
  13. 1 0
      src/main/java/com/yihu/quota/service/dimension/TjDimensionSlaveService.java
  14. 2 2
      src/main/java/com/yihu/quota/service/job/EsQuotaJob.java
  15. 1 0
      src/main/java/com/yihu/quota/service/job/EsQuotaPercentJob.java
  16. 57 56
      src/main/java/com/yihu/quota/service/job/JobService.java
  17. 185 0
      src/main/java/com/yihu/quota/service/job/SingleTableJob.java
  18. 1 0
      src/main/java/com/yihu/quota/service/org/OrgService.java
  19. 1 0
      src/main/java/com/yihu/quota/service/quota/special/DeviceService.java
  20. 1 0
      src/main/java/com/yihu/quota/service/quota/special/HbaseService.java
  21. 1 0
      src/main/java/com/yihu/quota/service/quota/special/SolrStatistsService.java
  22. 1 0
      src/main/java/com/yihu/quota/service/quota/special/StatisticsService.java
  23. 2 1
      src/main/java/com/yihu/quota/service/scheduler/HealthArchiveSchedulerService.java
  24. 20 17
      src/main/java/com/yihu/quota/util/QuartzHelper.java
  25. 23 0
      src/main/java/com/yihu/quota/util/sql/Db.java
  26. 50 0
      src/main/java/com/yihu/quota/util/sql/DbKit.java
  27. 18 0
      src/main/java/com/yihu/quota/util/sql/MysqlDb.java
  28. 18 0
      src/main/java/com/yihu/quota/util/sql/OracleDb.java

+ 7 - 10
src/main/java/com/yihu/quota/config/AsyncConfig.java

@ -9,7 +9,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
 * Created by Administrator on 2016.10.18.
 * @author Administrator
 * @date 2016.10.18
 * 启用多綫程
 */
@Configuration
@ -30,19 +31,15 @@ public class AsyncConfig {
    @Bean
    public Executor dbExtractExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
        return createExecutor();
    }
    @Bean
    public Executor dbStorageExecutor() {
        return createExecutor();
    }
    private Executor createExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);

+ 4 - 2
src/main/java/com/yihu/quota/config/quota/JobFactory.java

@ -7,18 +7,20 @@ import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
/**
 * Created by Administrator on 2016.10.12.
 * @author Administrator
 * @date 2016.10.12
 * 為了讓quartz種可以使用Spring的注入
 */
@Component("jobFactory")
public class JobFactory extends AdaptableJobFactory {
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;
    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        // 调用父类的方法
        Object jobInstance = super.createJobInstance(bundle);
       // 进行注入
        // 进行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }

+ 19 - 14
src/main/java/com/yihu/quota/config/quota/SchedulerConfig.java

@ -1,4 +1,5 @@
package com.yihu.quota.config.quota;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
@ -12,7 +13,8 @@ import java.io.IOException;
import java.util.Properties;
/**
 * Created by chenweida on 2016/2/3.
 * @author chenweida
 * @date 2016/2/3
 */
@Configuration
public class SchedulerConfig {
@ -22,22 +24,10 @@ public class SchedulerConfig {
    private JobFactory jobFactory;
    @Autowired
    private DataSource dataSource;
    @Bean
    SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean bean = new SchedulerFactoryBean();
        bean.setJobFactory(jobFactory);
        bean.setApplicationContext(this.applicationContext);
        bean.setOverwriteExistingJobs(true);
        bean.setStartupDelay(20);// 延时启动
        bean.setSchedulerName("schedulerFactoryBeanCWD");
        bean.setAutoStartup(true);
        bean.setDataSource(dataSource);
        bean.setQuartzProperties(quartzProperties());
        return bean;
    }
    /**
     * quartz配置文件
     *
     * @return
     * @throws IOException
     */
@ -48,5 +38,20 @@ public class SchedulerConfig {
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }
    @Bean
    SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean bean = new SchedulerFactoryBean();
        bean.setJobFactory(jobFactory);
        bean.setApplicationContext(this.applicationContext);
        bean.setOverwriteExistingJobs(true);
        // 延时启动
        bean.setStartupDelay(20);
        bean.setSchedulerName("schedulerFactoryBeanCWD");
        bean.setAutoStartup(true);
        bean.setDataSource(dataSource);
        bean.setQuartzProperties(quartzProperties());
        return bean;
    }
}

+ 119 - 0
src/main/java/com/yihu/quota/contants/JobConstant.java

@ -0,0 +1,119 @@
package com.yihu.quota.contants;
/**
 * @author l4qiang
 */
public interface JobConstant {
    /**
     * 任务状态
     */
    public enum Status {
        /**
         * 未执行
         */
        NotStart,
        /**
         * 正在执行
         */
        Running;
        public static Status fromInt(int x) {
            switch (x) {
                case 0:
                    return NotStart;
                case 1:
                    return Running;
                default:
                    return null;
            }
        }
        public static Status fromStr(String x) {
            switch (x) {
                case "0":
                    return NotStart;
                case "1":
                    return Running;
                default:
                    return null;
            }
        }
    }
    public enum ExecType {
        /**
         * 全量
         */
        Full,
        /**
         * 增量
         */
        Increment;
        public static ExecType fromInt(int x) {
            switch (x) {
                case 0:
                    return Full;
                case 1:
                    return Increment;
                default:
                    return null;
            }
        }
        public static ExecType fromStr(String x) {
            switch (x) {
                case "0":
                    return Full;
                case "1":
                    return Increment;
                default:
                    return null;
            }
        }
    }
    public enum ExecWay {
        /**
         * 手动执行
         */
        Manual,
        /**
         * Cron触发
         */
        Cron,
        /**
         * 事件触发
         */
        Trigger;
        public static ExecWay fromInt(int x) {
            switch (x) {
                case 0:
                    return Manual;
                case 1:
                    return Cron;
                case 2:
                    return Trigger;
                default:
                    return null;
            }
        }
        public static ExecWay fromStr(String x) {
            switch (x) {
                case "0":
                    return Manual;
                case "1":
                    return Cron;
                case "2":
                    return Trigger;
                default:
                    return null;
            }
        }
    }
}

+ 3 - 3
src/main/java/com/yihu/quota/controller/JobController.java

@ -37,7 +37,7 @@ public class JobController extends BaseController {
            @ApiParam(name = "id", value = "指标ID", required = true)
            @RequestParam(value = "id", required = true) Integer id) {
        try {
            jobService.executeJob(id, "1", null, null);
            jobService.execute(id, "1", null, null);
            return true;
        } catch (Exception e) {
            error(e);
@ -59,7 +59,7 @@ public class JobController extends BaseController {
            @ApiParam(name = "endDate", value = "截止日期")
            @RequestParam(value = "endDate", required = false) String endDate) {
        try {
            jobService.executeJob(id, "2", startDate, endDate);
            jobService.execute(id, "2", startDate, endDate);
            return true;
        } catch (Exception e) {
            error(e);
@ -74,7 +74,7 @@ public class JobController extends BaseController {
            @ApiParam(name = "id", value = "指标ID", required = true)
            @RequestParam(value = "id", required = true) Integer id) {
        try {
            jobService.removeJob(id);
            jobService.stop(id);
            return true;
        } catch (Exception e) {
            error(e);

+ 1 - 0
src/main/java/com/yihu/quota/dao/dict/SystemDictDao.java

@ -7,5 +7,6 @@ import org.springframework.data.repository.PagingAndSortingRepository;
/**
 * Created by chenweida on 2017/6/1.
 */
@Deprecated
public interface SystemDictDao extends PagingAndSortingRepository<SystemDict, Long>, JpaSpecificationExecutor<SystemDict> {
}

+ 1 - 0
src/main/java/com/yihu/quota/dao/dict/SystemDictListDao.java

@ -7,5 +7,6 @@ import org.springframework.data.repository.PagingAndSortingRepository;
/**
 * Created by chenweida on 2017/6/1.
 */
@Deprecated
public interface SystemDictListDao extends PagingAndSortingRepository<SystemDictList, Long>, JpaSpecificationExecutor<SystemDictList> {
}

+ 1 - 2
src/main/java/com/yihu/quota/dao/dimension/TjDimensionSlaveDao.java

@ -7,6 +7,5 @@ import org.springframework.data.repository.PagingAndSortingRepository;
/**
 * Created by chenweida on 2017/6/1.
 */
public interface TjDimensionSlaveDao extends PagingAndSortingRepository<TjDimensionSlave, Long>, JpaSpecificationExecutor<TjDimensionSlave
        > {
public interface TjDimensionSlaveDao extends PagingAndSortingRepository<TjDimensionSlave, Long>, JpaSpecificationExecutor<TjDimensionSlave> {
}

+ 7 - 2
src/main/java/com/yihu/quota/etl/formula/FormulaExecutor.java

@ -15,7 +15,7 @@ public class FormulaExecutor {
    /**
     * @param clazz  算法类名,包含完整路径,在配置时维护。如:com.yihu.quota.etl.formula.DivisionFunc
     * @param args   参数,需要替换参数使用${code}方式进行标识,在配置时维护.如:string:${province};number:1
     * @param values 值,需要赋值的值使用${code}方式进行标识
     * @param values 值,需要赋值的值使用code方式进行标识, 如:province,361000
     * @throws Exception
     */
    public void run(String clazz, String args, HashMap<String, String> values) throws Exception {
@ -66,7 +66,12 @@ public class FormulaExecutor {
    }
    private String getValue(HashMap<String, String> values, String defaultValue) {
        String value = values.get(defaultValue);
        String key = defaultValue;
        if (defaultValue.startsWith("${") && defaultValue.endsWith("}")) {
            key = defaultValue.substring(2, defaultValue.lastIndexOf("}"));
        }
        String value = values.get(key);
        if (value != null) {
            return value;
        }

+ 71 - 0
src/main/java/com/yihu/quota/kafka/Producer.java

@ -0,0 +1,71 @@
package com.yihu.quota.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml;
import java.io.FileInputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
/**
 * @author janseny
 * @date 2018/9/14
 */
@Component
public class Producer {
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    public static String sepTopic = "sep-hbase-data";
    public ProducerFactory<String, String> producerFactory() {
        String kafkaBrokerAddress = "";
        try {
            Yaml yaml = new Yaml();
            URL url = Producer.class.getClassLoader().getResource("application.yml");
            if (url != null) {
                Map map = (Map) yaml.load(new FileInputStream(url.getFile()));
                Map map2 = (Map) map.get("kafka");
                Map map3 = (Map) map2.get("broker");
                kafkaBrokerAddress = map3.get("address").toString();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    public boolean sendMessage(String topic, String message) {
        try {
            if (logger.isInfoEnabled()) {
                logger.info("send Message success.");
            }
            KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
            kafkaTemplate.send(topic, message);
            return true;
        } catch (Exception e) {
            if (logger.isErrorEnabled()) {
                logger.error("send Message fail." + "topic:" + topic + ",message:" + message + "error:" + e.getMessage(), e);
            }
            return false;
        }
    }
}

+ 1 - 1
src/main/java/com/yihu/quota/scheduler/special/OutPatientCostScheduler.java

@ -8,7 +8,7 @@ import com.yihu.ehr.profile.core.ResourceCore;
import com.yihu.ehr.solr.SolrUtil;
import com.yihu.ehr.util.datetime.DateUtil;
import com.yihu.ehr.util.rest.Envelop;
import com.yihu.quota.service.scheduler.HealthArchiveSchedulerService;
import com.yihu.quota.service.special.scheduler.HealthArchiveSchedulerService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;

+ 1 - 0
src/main/java/com/yihu/quota/service/dimension/TjDimensionMainService.java

@ -13,6 +13,7 @@ import java.util.List;
 * @date 2017/6/1
 */
@Service
@Deprecated
public class TjDimensionMainService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

+ 1 - 0
src/main/java/com/yihu/quota/service/dimension/TjDimensionSlaveService.java

@ -13,6 +13,7 @@ import java.util.List;
 * @date 2017/6/1
 */
@Service
@Deprecated
public class TjDimensionSlaveService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

+ 2 - 2
src/main/java/com/yihu/quota/service/job/EsQuotaJob.java

@ -35,7 +35,6 @@ import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
 * @author chenweida
@ -45,6 +44,7 @@ import java.util.Map;
@Component
@Scope("prototype")
@DisallowConcurrentExecution
@Deprecated
public class EsQuotaJob implements Job {
    @Autowired
    ElasticsearchUtil elasticsearchUtil;
@ -57,6 +57,7 @@ public class EsQuotaJob implements Job {
    private String executeFlag; // 执行动作 1 手动执行 2 周期执行
    private int haveThreadCount = 0;//已完成线程数
    private int threadCount = 1;//总线程数
    @Autowired
    private TjQuotaLogDao tjQuotaLogDao;
    @Autowired
@ -294,7 +295,6 @@ public class EsQuotaJob implements Job {
     */
    private void initParams(JobExecutionContext context) {
        JobDataMap map = context.getJobDetail().getJobDataMap();
        Map<String, Object> params = context.getJobDetail().getJobDataMap();
        Object object = map.get("quota");
        if (object != null) {
            BeanUtils.copyProperties(object, this.quotaVo);

+ 1 - 0
src/main/java/com/yihu/quota/service/job/EsQuotaPercentJob.java

@ -40,6 +40,7 @@ import java.util.List;
@Component
@Scope("prototype")
@DisallowConcurrentExecution
@Deprecated
public class EsQuotaPercentJob implements Job {
    @Autowired
    ElasticsearchUtil elasticsearchUtil;

+ 57 - 56
src/main/java/com/yihu/quota/service/job/JobService.java

@ -1,14 +1,15 @@
package com.yihu.quota.service.job;
import com.yihu.quota.contants.JobConstant;
import com.yihu.quota.dao.TjQuotaDao;
import com.yihu.quota.model.TjQuota;
import com.yihu.quota.util.QuartzHelper;
import com.yihu.quota.vo.QuotaVo;
import org.apache.commons.lang3.StringUtils;
import org.quartz.ObjectAlreadyExistsException;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
@ -18,7 +19,6 @@ import java.util.Map;
 */
@Service
public class JobService {
    @Autowired
    private QuartzHelper quartzHelper;
    @Autowired
@ -26,49 +26,55 @@ public class JobService {
    /**
     * 启动指标任务
     * 启动任务只关心任务是否手动(马上)执行还是任务执行(Cron),对于数据是否初始化(全量),或者增量,这个是具体Job判断的事情。
     * 任务只是执行时间有区别,并且任务不可能同时存在立即执行和周期执行两种行为,不需要增加任务名称来增加复杂度
     *
     * @param id          指标ID
     * @param executeFlag 执行动作标识,1:初始执行(全量统计),2:立即执行、周期执行(增量统计)
     * @param executeFlag 执行动作标识,0:全量执行,1:增量执行。
     * @param startDate   抽取数据起始日期。初始执行时为NULL;立即执行时需要传值;周期执行时也为NULL,如果是基础指标周期执行,后续默认赋值昨天开始。
     * @param endDate     抽取数据截止日期。初始执行时无NULL;立即执行时需要传值;周期执行时也为NULL,如果是基础指标周期执行,后续默认赋值昨天截止。
     * @throws Exception
     */
    public void executeJob(Integer id, String executeFlag, String startDate, String endDate) throws Exception {
    public void execute(Integer id, String executeFlag, String startDate, String endDate) throws Exception {
        TjQuota tjQuota = quotaDao.findOne(id);
        if (tjQuota != null) {
            QuotaVo quotaVo = new QuotaVo();
            BeanUtils.copyProperties(tjQuota, quotaVo);
            Map<String, Object> params = new HashMap<>();
            params.put("quota", quotaVo);
            params.put("executeFlag", executeFlag);
            params.put("startTime", startDate);
            params.put("endTime", endDate);
            String quotaCode = quotaVo.getCode().replace("_", "");
            String quotaCodeImmediately = quotaCode + "immediately";
            boolean existJob = quartzHelper.isExistJob(quotaCode);
            boolean existJobImmediately = quartzHelper.isExistJob(quotaCodeImmediately);
            if (existJob && "0".equals(quotaVo.getJobStatus())) {
                //周期执行jobKey
                quartzHelper.removeJob(quotaCode);
            }
            if (existJobImmediately) {
                //立即执行jobKey
                quartzHelper.removeJob(quotaCodeImmediately);
            }
            //往quartz框架添加任务
            if ((!StringUtils.isEmpty(executeFlag) && executeFlag.equals("1")) || // 初始执行
                    (!StringUtils.isEmpty(tjQuota.getJobClazz()) && tjQuota.getExecType().equals("1"))) { // 立即执行
                try {
                    quartzHelper.startNow(Class.forName(quotaVo.getJobClazz()), quotaCodeImmediately, params);
                } catch (Exception e) {
                    throw new ObjectAlreadyExistsException(quotaCodeImmediately + "," + tjQuota.getName() + "指标正在执行!");
                }
            } else {
                //周期执行指标 更新指标执行状态:0未开启,1执行中
                tjQuota.setJobStatus("1");
                quotaDao.save(tjQuota);
                quartzHelper.addJob(Class.forName(quotaVo.getJobClazz()), quotaVo.getCron(), quotaCode, params);
        if (tjQuota == null) {
            throw new Exception("数据采集任务,id:" + id);
        }
        QuotaVo quotaVo = new QuotaVo();
        BeanUtils.copyProperties(tjQuota, quotaVo);
        Map<String, Object> params = new HashMap<>(4);
        params.put("quota", quotaVo);
        params.put("executeFlag", executeFlag);
        params.put("startTime", startDate);
        params.put("endTime", endDate);
        String jobId = quotaVo.getCode();
        boolean existJobId = quartzHelper.isExistJob(jobId);
        if (existJobId) {
            quartzHelper.removeJob(jobId);
        }
        if (StringUtils.isEmpty(quotaVo.getJobClazz())) {
            throw new Exception("数据采集任务未配置执行类");
        }
        //往quartz框架添加任务
        JobConstant.ExecWay execWay = JobConstant.ExecWay.fromStr(tjQuota.getExecType());
        // 初始执行或者立即执行
        if (JobConstant.ExecWay.Manual.equals(execWay)) {
            try {
                quartzHelper.startNow(Class.forName(quotaVo.getJobClazz()), jobId, params);
            } catch (Exception e) {
                throw new ObjectAlreadyExistsException(jobId + "," + tjQuota.getName() + "指标正在执行!");
            }
        } else {
            //周期执行指标 更新指标执行状态:0未开启,1执行中
            tjQuota.setJobStatus("1");
            quotaDao.save(tjQuota);
            quartzHelper.addJob(Class.forName(quotaVo.getJobClazz()), quotaVo.getCron(), jobId, params);
        }
    }
@ -78,28 +84,23 @@ public class JobService {
     * @param id 指标ID
     * @throws Exception
     */
    public void removeJob(Integer id) throws Exception {
    public void stop(Integer id) throws Exception {
        TjQuota tjQuota = quotaDao.findOne(id);
        if (tjQuota != null) {
            QuotaVo quotaVo = new QuotaVo();
            BeanUtils.copyProperties(tjQuota, quotaVo);
            String quotaCode = quotaVo.getCode().replace("_", "");
            String quotaCodeImmediately = quotaCode + "immediately";
            boolean existJob = quartzHelper.isExistJob(quotaCode);
            boolean existJobImmediately = quartzHelper.isExistJob(quotaCodeImmediately);
            if (existJob) {
                //周期执行jobKey
                quartzHelper.removeJob(quotaCode);
            }
            if (existJobImmediately) {
                //立即执行jobKey
                quartzHelper.removeJob(quotaCodeImmediately);
            }
            //周期执行指标 更新指标执行状态:0未开启,1执行中
            tjQuota.setJobStatus("0");
            quotaDao.save(tjQuota);
        if (tjQuota == null) {
            throw new Exception("数据采集任务,id:" + id);
        }
        QuotaVo quotaVo = new QuotaVo();
        BeanUtils.copyProperties(tjQuota, quotaVo);
        String jobId = quotaVo.getCode();
        boolean existJobId = quartzHelper.isExistJob(jobId);
        if (existJobId) {
            quartzHelper.removeJob(jobId);
        }
        //周期执行指标 更新指标执行状态:0未开启,1执行中
        tjQuota.setJobStatus("0");
        quotaDao.save(tjQuota);
    }
}

+ 185 - 0
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -0,0 +1,185 @@
package com.yihu.quota.service.job;
import com.google.gson.Gson;
import com.yihu.ehr.util.datetime.DateUtil;
import com.yihu.quota.contants.JobConstant;
import com.yihu.quota.kafka.Producer;
import com.yihu.quota.util.sql.DbKit;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * 用于一个表就是一个多维数据集的情况,如组织机构表的数据采集
 * 数据采集表要求,不符合要求的表需要先改造后进行数据采集:
 * <p>
 * 表必须是单字段唯一键(或主键),不支持复合唯一键(或主键)
 * 过滤字段只支持单字段,不支持多字段过滤
 * 过滤字段只支持时间和数字字段,不支持其他类型字段
 *
 * @author l4qiang
 * @date 2018-09-18
 */
@Component
@Scope("prototype")
@DisallowConcurrentExecution
public class SingleTableJob implements Job {
    static private Logger logger = LoggerFactory.getLogger(SingleTableJob.class);
    /**
     * 数据来源表
     */
    protected String table;
    /**
     * 表主键
     */
    protected String primeKey;
    /**
     * 过滤字段
     */
    protected String filterField;
    /**
     * 过滤字段类型
     */
    protected String filterFieldType;
    /**
     * 过滤数据步长
     */
    protected String size;
    /**
     * 开始时间
     */
    protected String start;
    /**
     * 结束时间
     */
    protected String end;
    /**
     * 执行动作 1 手动执行 2 周期执行
     */
    protected JobConstant.ExecType execType;
    @Autowired
    private Producer producer;
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        prepare(jobExecutionContext);
        cleanData();
        List<Map<String, Object>> list;
        do {
            list = fetch();
            saveData(list);
        } while (list != null && list.size() != 0);
    }
    private void prepare(JobExecutionContext jobExecutionContext) {
        //spring注入
        SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        table = jobDataMap.getString("table");
        primeKey = jobDataMap.getString("primeKey");
        filterField = jobDataMap.getString("filterField");
        filterFieldType = jobDataMap.getString("filterFieldType");
        size = jobDataMap.getString("size");
        start = jobDataMap.getString("start");
        end = jobDataMap.getString("end");
    }
    private void cleanData() {
        if (JobConstant.ExecType.Full.equals(execType)) {
            Map<String, Object> dataMap = new HashMap<>(2);
            dataMap.put("table", table);
            dataMap.put("delAll", true);
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            producer.sendMessage(Producer.sepTopic, jsonData);
        }
    }
    private void saveData(List<Map<String, Object>> list) {
        if (list == null) {
            logger.warn("未获取到数据");
            return;
        }
        list.forEach(item -> {
            Map<String, Object> dataMap = new HashMap<>(item.size());
            dataMap.put("table", table);
            item.forEach((key, value) -> {
                if (key.equals(primeKey)) {
                    dataMap.put("rowKey", value);
                }
                dataMap.put(key, value);
            });
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            producer.sendMessage(Producer.sepTopic, jsonData);
        });
    }
    /**
     * TODO:没有设置数据库来源和数据库类型,当前使用默认数据库
     *
     * @return
     */
    private List<Map<String, Object>> fetch() {
        String sql = "";
        if ("number".equals(filterFieldType)) {
            Long lngTemp = Long.parseLong(start) + Long.parseLong(size);
            String temp = lngTemp.toString();
            sql = "select * from " + table +
                    " where " + filterField + ">=" + start + " and " + filterField + "<" + temp + " and " +
                    filterField + "<=" + end;
            start = temp;
        } else if ("date".equals(filterFieldType)) {
            Date date = DateUtil.toDateFromTime(start);
            java.util.Calendar calendar = java.util.Calendar.getInstance();
            calendar.setTime(date);
            calendar.add(java.util.Calendar.HOUR_OF_DAY, Integer.parseInt(size));
            String temp = DateUtil.toString(calendar.getTime(), DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
            sql = "select * from " + table +
                    " where " + filterField + ">=" + DbKit.use().getLongDate(start) + " and " + filterField + "<" + DbKit.use().getLongDate(temp) + " and " +
                    filterField + "<=" + DbKit.use().getLongDate(end);
            start = temp;
        } else {
            logger.warn("不支持的过滤字段类型");
            return null;
        }
        return jdbcTemplate.queryForList(sql);
    }
}

+ 1 - 0
src/main/java/com/yihu/quota/service/org/OrgService.java

@ -12,6 +12,7 @@ import java.util.Map;
 * Created by progr1mmer on 2017/12/29.
 */
@Service
@Deprecated
public class OrgService {
    @Autowired

+ 1 - 0
src/main/java/com/yihu/quota/service/quota/special/DeviceService.java

@ -14,6 +14,7 @@ import javax.transaction.Transactional;
 */
@Service
@Transactional
@Deprecated
public class DeviceService {
    @Autowired

+ 1 - 0
src/main/java/com/yihu/quota/service/quota/special/HbaseService.java

@ -21,6 +21,7 @@ import java.util.Map;
 * @author jansney
 */
@Service
@Deprecated
public class HbaseService {
    @Autowired
    HBaseDao hbaseDao;

+ 1 - 0
src/main/java/com/yihu/quota/service/quota/special/SolrStatistsService.java

@ -19,6 +19,7 @@ import java.util.Map;
 * Created by janseny on 2017/7/7.
 */
@Service
@Deprecated
public class SolrStatistsService {
    private static String core = "HealthProfile";

+ 1 - 0
src/main/java/com/yihu/quota/service/quota/special/StatisticsService.java

@ -20,6 +20,7 @@ import java.util.*;
 * Created by lyr on 2016/7/21.
 */
@Service
@Deprecated
public class StatisticsService {
    private static String core = "HealthProfile";

+ 2 - 1
src/main/java/com/yihu/quota/service/scheduler/HealthArchiveSchedulerService.java

@ -1,4 +1,4 @@
package com.yihu.quota.service.scheduler;
package com.yihu.quota.service.special.scheduler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
@ -25,6 +25,7 @@ import java.util.*;
 * Created by wxw on 2018/3/14.
 */
@Service
@Deprecated
public class HealthArchiveSchedulerService {
    @Autowired

+ 20 - 17
src/main/java/com/yihu/quota/util/QuartzHelper.java

@ -12,6 +12,9 @@ import java.util.Map;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
/**
 * @author administrator
 */
@Component("quartzHelper")
public class QuartzHelper {
    @Autowired
@ -29,22 +32,20 @@ public class QuartzHelper {
        }
    }
    public void addJob(Class jobClass, String cronString, String jobKey,
                       Map<String, Object> params) throws Exception {
        if (!CronExpression.isValidExpression(cronString)) {
    public void addJob(Class jobClass, String cron, String id, Map<String, Object> params) throws Exception {
        if (!CronExpression.isValidExpression(cron)) {
            throw new Exception("cronExpression is not a valid Expression");
        }
        try {
            JobDetail job = JobBuilder.newJob(jobClass)
                    .withIdentity("job-id:" + jobKey, "job-group:" + jobKey)
                    .withIdentity("job-id:" + id, "job-group:" + id)
                    .build();
            JobDataMap jobDataMap = job.getJobDataMap();
            jobDataMap.putAll(params);
            CronTrigger trigger = TriggerBuilder
                    .newTrigger()
                    .withIdentity("trigger-name:" + jobKey,
                            "trigger-group:" + jobKey)
                    .withSchedule(CronScheduleBuilder.cronSchedule(cronString))
                    .withIdentity("trigger-name:" + id, "trigger-group:" + id)
                    .withSchedule(CronScheduleBuilder.cronSchedule(cron))
                    .build();
            scheduler.scheduleJob(job, trigger);
            scheduler.start();
@ -53,18 +54,20 @@ public class QuartzHelper {
        }
    }
    public void removeJob(String jobKeyString) throws Exception {
        TriggerKey triggerKey = new TriggerKey("trigger-name:" + jobKeyString,
                "trigger-group:" + jobKeyString);
        JobKey jobName = new JobKey("job-group:" + jobKeyString, "job-id:"
                + jobKeyString);
        scheduler.pauseTrigger(triggerKey);// 停止触发器
        scheduler.unscheduleJob(triggerKey);// 移除触发器
        scheduler.deleteJob(jobName);// 删除任务
    public void removeJob(String id) throws Exception {
        TriggerKey triggerKey = new TriggerKey("trigger-name:" + id, "trigger-group:" + id);
        JobKey jobKey = new JobKey("job-group:" + id, "job-id:" + id);
        // 停止触发器
        scheduler.pauseTrigger(triggerKey);
        // 移除触发器
        scheduler.unscheduleJob(triggerKey);
        // 删除任务
        scheduler.deleteJob(jobKey);
    }
    public boolean isExistJob(String jobKey) throws SchedulerException {
        JobKey jk = new JobKey("job-id:" + jobKey, "job-group:" + jobKey);
    public boolean isExistJob(String id) throws SchedulerException {
        JobKey jk = new JobKey("job-id:" + id, "job-group:" + id);
        if (scheduler.checkExists(jk)) {
            return true;
        } else {

+ 23 - 0
src/main/java/com/yihu/quota/util/sql/Db.java

@ -0,0 +1,23 @@
package com.yihu.quota.util.sql;
public interface Db {
    String getDate(String value);
    String getLongDate(String value);
    String getNumber(String value);
    /**
     * 支持的数据类型
     */
    public enum Type {
        /**
         * oracle
         */
        oracle,
        /**
         * mysql
         */
        mysql
    }
}

+ 50 - 0
src/main/java/com/yihu/quota/util/sql/DbKit.java

@ -0,0 +1,50 @@
package com.yihu.quota.util.sql;
import java.util.HashMap;
public class DbKit {
    private Db defaultDb;
    private HashMap<Db.Type, Db> mapDB = new HashMap<>();
    private DbKit(Db.Type type) {
        Db db = mapDB.get(type);
        if (db != null) {
            defaultDb = db;
            return;
        }
        switch (type) {
            case mysql:
                db = new MysqlDb();
                break;
            case oracle:
                db = new OracleDb();
                break;
            default:
                break;
        }
        defaultDb = db;
        mapDB.put(type, db);
    }
    public static DbKit use() {
        return use(Db.Type.mysql);
    }
    public static DbKit use(Db.Type type) {
        return new DbKit(type);
    }
    public String getDate(String value) {
        return defaultDb.getDate(value);
    }
    public String getLongDate(String value) {
        return defaultDb.getLongDate(value);
    }
    public String getNumber(String value) {
        return defaultDb.getNumber(value);
    }
}

+ 18 - 0
src/main/java/com/yihu/quota/util/sql/MysqlDb.java

@ -0,0 +1,18 @@
package com.yihu.quota.util.sql;
public class MysqlDb implements Db {
    @Override
    public String getDate(String value) {
        return "str_to_date('" + value + "','%Y-%M-%D')";
    }
    @Override
    public String getLongDate(String value) {
        return "str_to_date('" + value + "','%Y-%m-%d %H:%i:%S')";
    }
    @Override
    public String getNumber(String value) {
        return "cast(" + value + " as signed integer)";
    }
}

+ 18 - 0
src/main/java/com/yihu/quota/util/sql/OracleDb.java

@ -0,0 +1,18 @@
package com.yihu.quota.util.sql;
public class OracleDb implements Db {
    @Override
    public String getDate(String value) {
        return "to_date('" + value + "','YYYY-MM-DD')";
    }
    @Override
    public String getLongDate(String value) {
        return "to_date('" + value + "','YYYY-MM-DD HH24:MI:SS')";
    }
    @Override
    public String getNumber(String value) {
        return "to_number(" + value + ")";
    }
}