فهرست منبع

统计代码重构

esb 8 سال پیش
والد
کامیت
3b27abddb9

+ 13 - 0
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/config/AsyncConfig.java

@ -0,0 +1,13 @@
package com.yihu.wlyy.statistics.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
/**
 * Created by Administrator on 2016.10.18.
 * 启用多綫程
 */
@Configuration
@EnableAsync
public class AsyncConfig {
}

+ 7 - 6
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/etl/extract/DBExtract.java

@ -40,8 +40,8 @@ public class DBExtract<T> {
     * @param isMultithreading 是否多线程抽取 默认否
     * @return
     */
    public List<T> extractByPage(Class<T> clazz,String sql,int pageSize,Boolean isMultithreading){
        return dbPageExtract.extractByPage(clazz,sql,pageSize,isMultithreading);
    public List<T> extractByPage(Class<T> clazz,String sql,String countSql,int pageSize,Boolean isMultithreading)throws  Exception{
        return dbPageExtract.extractByPage(clazz,sql,countSql,pageSize,isMultithreading);
    }
    /**
     * 分页抽取
@ -50,9 +50,9 @@ public class DBExtract<T> {
     * @param isMultithreading 是否多线程抽取 默认否
     * @return
     */
    public List<Object> extractByPage(Class<T> clazz,String sql,Boolean isMultithreading){
    public List<Object> extractByPage(Class<T> clazz,String sql,String countSql,Boolean isMultithreading)throws  Exception{
        int pageSize=10000;
        return dbPageExtract.extractByPage(clazz,sql,pageSize,isMultithreading);
        return dbPageExtract.extractByPage(clazz,sql,countSql,pageSize,isMultithreading);
    }
    /**
     * 分页抽取
@ -60,9 +60,10 @@ public class DBExtract<T> {
     * @param sql
     * @return
     */
    public List<Object> extractByPage(Class<T> clazz,String sql){
    public List<Object> extractByPage(Class<T> clazz,String sql)throws  Exception{
        int pageSize=10000;
        Boolean isMultithreading=false;
        return dbPageExtract.extractByPage(clazz,sql,pageSize,isMultithreading);
        String countSql="";
        return dbPageExtract.extractByPage(clazz,sql,countSql,pageSize,isMultithreading);
    }
}

+ 100 - 2
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/etl/extract/DBPageExtract.java

@ -4,10 +4,17 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
 * Created by Administrator on 2016.10.16.
@ -19,14 +26,105 @@ public class DBPageExtract<T> {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    public List<T> extractByPage(Class<T> clazz, String sql, int pageSize, Boolean isMultithreading){
     private  List<T> returnList=new ArrayList<T>();
    public List<T> extractByPage(Class<T> clazz, String sql,String countSql, int pageSize, Boolean isMultithreading) throws Exception{
        if(isMultithreading){
            return noMultiThreadExtract(clazz, sql, pageSize);
            if(StringUtils.isEmpty(countSql)){
                throw new Exception("countSql is null");
            }
            return MultiThreadExtract(clazz, sql,countSql, pageSize);
        }else{
            return noMultiThreadExtract(clazz, sql, pageSize);
        }
    }
    /**
     * 多线程抽取数据
     * @param clazz
     * @param sql
     * @param pageSize
     * @return
     */
    private List<T> MultiThreadExtract(Class<T> clazz, String sql,String countSql, int pageSize)  {
        try{
            //得到数据的总数
            Integer dataCount=getCount(countSql);
            //根据count 计算出 总共要执行几次
            Integer forCount=getForCount(dataCount,pageSize);
            //綫程返回值的值
            List<AsyncResult<Boolean>> asyncResultPool=new ArrayList<AsyncResult<Boolean>>();
            for(int page=0;page<forCount;page++){
                //启动多线程采集数据
                AsyncResult<Boolean> future= multiExtractData(sql,page*pageSize,pageSize,clazz);
                asyncResultPool.add(future);
            }
            ///循环判断,等待获取结果信息
            while (true){
                //得到迭代器
                Iterator<AsyncResult<Boolean>> asyncResultIterator=asyncResultPool.iterator();
                //判断有没有下一个
                while (asyncResultIterator.hasNext()){
                    AsyncResult<Boolean> asyncResult= asyncResultIterator.next();
                    if(asyncResult.isDone()){
                        //如果做完了就移除迭代器
                        asyncResultIterator.remove();
                    }
                }
                //如果一直移除到没有下一个就跳出循环
                if(!asyncResultIterator.hasNext()){
                    break;
                }
            }
            return returnList;
        }catch (Exception e){
            //如果多线程错误 就改用单线程
            return noMultiThreadExtract(clazz,sql,pageSize);
        }
    }
    private Integer getForCount(Integer dataCount, int pageSize) {
        //根据所有的数据取余数
        int yushu=dataCount%pageSize;
        int page=dataCount/pageSize;
        if(yushu==0){
            //如果没有余数 返回总的页数
            return page;
        }else{
            //如果有余数 页数多加1
            page++;
            return page;
        }
    }
    private Integer getCount(String countSql) {
        Integer countMap= jdbcTemplate.queryForObject(countSql,Integer.class);
        return countMap;
    }
    /**
     * 多线程采集数据
     * @param sql
     * @param start
     * @param pageSize
     * @param clazz
     * @return
     */
    @Async
    private AsyncResult<Boolean> multiExtractData(String sql, int start, int pageSize, Class<T> clazz) {
        String finalSql=sql+" limit "+start+","+pageSize; //拼凑分页的语句
        List<T> listTemp= jdbcTemplate.query(finalSql,new BeanPropertyRowMapper(clazz));
        returnList.addAll(listTemp);
        return new AsyncResult<>(true);
    }
    /**
     * 不用多线程抽取
     * @param clazz
     * @param sql
     * @param pageSize
     * @return
     */
    private List<T> noMultiThreadExtract(Class<T> clazz, String sql, int pageSize) {
        List<T> returnList=new ArrayList<T>();
        int start=0;

+ 2 - 0
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/etl/storage/DBStorage.java

@ -18,6 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@ -772,6 +773,7 @@ public class DBStorage   {
        return wlyyQuotaResult;
    }
    @Async
    @Transactional
    private void saveAll(List<WlyyQuotaResult> wlyyQuotaResults) throws  Exception{
        wlyyQuotaResultDao.save(wlyyQuotaResults);

+ 2 - 1
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/job/business/AllSignExpenseStatusJob.java

@ -99,8 +99,9 @@ public class AllSignExpenseStatusJob implements Job{
            String dateTemp = date + Constant.quota_date_last;
            String sql=" select id,code,idcard,hospital,admin_team_code,expenses_status from wlyy_sign_family a where  a.type =2 and  a.apply_date< '"+dateTemp+"' ";
            String sqlCount=" select count(id) from wlyy_sign_family a where  a.type =2 and  a.apply_date< '"+dateTemp+"'";
            //抽取數據
            List<SignFamily> signFamilies= dbExtract.extract(SignFamily.class,sql);
            List<SignFamily> signFamilies= dbExtract.extractByPage(SignFamily.class,sql,sqlCount,true);
            //清洗數據
            FilterModel etlModels= signDataFilter.filter(signFamilies, SignDataFilter.level2Expenses,sql,date);
            //统计数据

+ 2 - 1
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/job/business/AllSignJob.java

@ -85,8 +85,9 @@ public class AllSignJob implements Job {
            quartzJobLog.setJobName(wlyyJobConfig.getJobName());
            String dateTemp = date + Constant.quota_date_last;
            String sql=" select id,code,idcard,hospital,admin_team_code,expenses_status from wlyy_sign_family a where  a.type =2  and expenses_status=1 and a.expenses_time< '"+dateTemp+"'" ;
            String sqlCount="select count(id) from wlyy_sign_family a where  a.type =2  and expenses_status=1 and a.expenses_time< '"+dateTemp+"'";
            //抽取數據 分页抽取
            List<SignFamily> signFamilies= dbExtract.extractByPage(SignFamily.class,sql);
            List<SignFamily> signFamilies= dbExtract.extractByPage(SignFamily.class,sql,sqlCount,true);
            //清洗數據
            FilterModel etlModels= signDataFilter.filter(signFamilies, SignDataFilter.level2Sex,sql,date);
            //统计数据

+ 0 - 15
patient-co-statistics/src/main/java/com/yihu/wlyy/statistics/job/business/SignJob.java

@ -253,19 +253,4 @@ public class SignJob implements Job {
        return dateString;
    }
    private String saveContent(List<SignFamily> signFamilys, Long qkCount, Long orgCount, Long townCount, Long cityCount, boolean isAll, StringBuffer errorContent,Long errorCount,String sql) {
        StringBuffer string=new StringBuffer("统计"+yesterday+" 的签约数据完成 ,数据库查询到签约数目:"+signFamilys.size());
        string.append(",sql语句:"+sql);
        string.append(",过滤的脏数据数目:"+errorCount);
        string.append(",统计到市的数据总数:"+cityCount);
        string.append(",统计到区的数据总数:"+townCount);
        string.append(",统计到机构的数据总数:"+orgCount);
        string.append(",统计到团队的数据总数:"+qkCount);
        string.append(",是否统计成功:"+isAll);
        if(!isAll){
            string.append(",失败原因:"+errorContent);
        }
        return string.toString();
    }
}