Explorar o código

统计代码重构

esb %!s(int64=8) %!d(string=hai) anos
pai
achega
2ec3167076

+ 37 - 1
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/config/AsyncConfig.java

@ -1,7 +1,11 @@
package com.yihu.wlyy.statistics.config;
import java.util.concurrent.Executor;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
 * Created by Administrator on 2016.10.18.
@ -9,5 +13,37 @@ import org.springframework.scheduling.annotation.EnableAsync;
 */
@Configuration
@EnableAsync
public class AsyncConfig {
public class AsyncConfig  {
    /** 如果池中的实际线程数小于corePoolSize,无论是否其中有空闲的线程,都会给新的任务产生新的线程 */
    private int corePoolSize = 5;
    /** 如果池中的线程数=maximumPoolSize,则有空闲线程使用空闲线程,否则新任务放入queueCapacity. */
    private int maxPoolSize = 20;
    /** 缓冲队列. */
    private int queueCapacity = 10;
    @Bean
    public Executor dbExtractExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    @Bean
    public Executor dbStorageExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

+ 30 - 28
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/etl/extract/DBPageExtract.java

@ -39,6 +39,33 @@ public class DBPageExtract<T> {
        }
    }
    /**
     * 不用多线程抽取
     * @param clazz
     * @param sql
     * @param pageSize
     * @return
     */
    private List<T> noMultiThreadExtract(Class<T> clazz, String sql, int pageSize) {
        List<T> returnList=new ArrayList<T>();
        int start=0;
        int page=1;
        while (true){
            String finalSql=sql+" limit "+start+","+pageSize; //拼凑分页的语句
            List<T> listTemp= jdbcTemplate.query(finalSql,new BeanPropertyRowMapper(clazz));
            returnList.addAll(listTemp);//添加到list里面
            //判断是都是最后页面
            if(listTemp.size()!=pageSize){
                listTemp.clear();
                break;
            }else{
                start =page*pageSize;
                page++;
                listTemp.clear();
            }
        }
        return returnList;
    }
    /**
     * 多线程抽取数据
     * @param clazz
@ -69,6 +96,8 @@ public class DBPageExtract<T> {
                    if(asyncResult.isDone()){
                        //如果做完了就移除迭代器
                        asyncResultIterator.remove();
                    }else{
                        Thread.sleep(500L);
                    }
                }
                //如果一直移除到没有下一个就跳出循环
@ -110,7 +139,7 @@ public class DBPageExtract<T> {
     * @param clazz
     * @return
     */
    @Async
    @Async("dbExtractExecutor")
    private AsyncResult<Boolean> multiExtractData(String sql, int start, int pageSize, Class<T> clazz) {
        String finalSql=sql+" limit "+start+","+pageSize; //拼凑分页的语句
        List<T> listTemp= jdbcTemplate.query(finalSql,new BeanPropertyRowMapper(clazz));
@ -118,32 +147,5 @@ public class DBPageExtract<T> {
        return new AsyncResult<>(true);
    }
    /**
     * 不用多线程抽取
     * @param clazz
     * @param sql
     * @param pageSize
     * @return
     */
    private List<T> noMultiThreadExtract(Class<T> clazz, String sql, int pageSize) {
        List<T> returnList=new ArrayList<T>();
        int start=0;
        int page=1;
        while (true){
            String finalSql=sql+" limit "+start+","+pageSize; //拼凑分页的语句
            List<T> listTemp= jdbcTemplate.query(finalSql,new BeanPropertyRowMapper(clazz));
            returnList.addAll(listTemp);//添加到list里面
            //判断是都是最后页面
            if(listTemp.size()!=pageSize){
                listTemp.clear();
                break;
            }else{
                start =page*pageSize;
                page++;
                listTemp.clear();
            }
        }
        return returnList;
    }
}

+ 1 - 1
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/etl/storage/DBStorage.java

@ -773,7 +773,7 @@ public class DBStorage   {
        return wlyyQuotaResult;
    }
    @Async
    @Async("dbStorageExecutor")
    @Transactional
    private void saveAll(List<WlyyQuotaResult> wlyyQuotaResults) throws  Exception{
        wlyyQuotaResultDao.save(wlyyQuotaResults);