Prechádzať zdrojové kódy

内网签约数据同步线程代码改造

huangwenjie 7 rokov pred
rodič
commit
f5878f42a4

+ 6 - 6
patient-co-service/wlyy_sign/src/main/java/com/yihu/wlyy/sign/common/thread/LoadNewSignThread.java

@ -16,7 +16,7 @@ public class LoadNewSignThread implements Runnable {
    int retryTime =  30000;
    //空闲时间
    Integer nightHours = 22;
    Integer nightHours = 23;
    Integer morningHours = 6;
    //默认线程间隔
@ -29,8 +29,8 @@ public class LoadNewSignThread implements Runnable {
                Date now = new Date();
                //判断非空闲时候
//                if(now.getHours()>=morningHours &&  now.getHours() < nightHours)
//                {
                if(now.getHours()>=morningHours &&  now.getHours() < nightHours)
                {
                    //签约更新
                    SignZYService signZYService = (SignZYService) SpringContextHolder.getSpringBean("SignZYService");
                    SystemDictService systemDictService = (SystemDictService) SpringContextHolder.getSpringBean("SystemDictService");
@ -56,9 +56,9 @@ public class LoadNewSignThread implements Runnable {
                        Thread.sleep(retryTime);
                        continue;
                    }
//                }
//
//                Thread.sleep(sleepTime);
                }
                Thread.sleep(sleepTime);
            } catch (Exception ex) {
                ex.printStackTrace();
                try {

+ 6 - 6
patient-co-service/wlyy_sign/src/main/java/com/yihu/wlyy/sign/common/thread/LoadThread.java

@ -20,7 +20,7 @@ public class LoadThread implements Runnable {
    Integer morningHours = 6;
    //默认线程间隔
    Integer sleepTime = 180;
    Integer sleepTime = 60;
    @Override
    public void run() {
@ -37,21 +37,21 @@ public class LoadThread implements Runnable {
                    if(signZYService!=null && systemDictService!=null)
                    {
                        Boolean running = systemDictService.getLoadRunning();
                        sleepTime = systemDictService.getLoadSleepTime();
//                        sleepTime = systemDictService.getLoadSleepTime();
                        if(running) {
                            System.out.print(DateUtil.dateToStrLong(now) + " 开始采集签约...\r\n");
                            System.out.print(DateUtil.dateToStrLong(now) + " 开始采集签约(更新)...\r\n");
                            try {
                                signZYService.loadSignFamilyThread();
                                signZYService.loadSignFamilyThreadMore();
                            }
                            catch (Exception e)
                            {
                                e.printStackTrace();
                            }
                            System.out.print(DateUtil.dateToStrLong(new Date()) + " 采集签约结束。\r\n");
                            System.out.print(DateUtil.dateToStrLong(new Date()) + " 采集签约(更新)结束。\r\n");
                        }
                        Thread.sleep(sleepTime*1000);//间隔3分钟
                        Thread.sleep(sleepTime*1000);//间隔1分钟
                        continue;
                    }
                    else{

+ 3 - 2
patient-co-service/wlyy_sign/src/main/java/com/yihu/wlyy/sign/common/thread/UploadChargeThread.java

@ -1,5 +1,6 @@
package com.yihu.wlyy.sign.common.thread;
import com.yihu.wlyy.sign.common.util.DateUtil;
import com.yihu.wlyy.sign.common.util.SpringContextHolder;
import com.yihu.wlyy.sign.service.ChargeZYService;
import com.yihu.wlyy.sign.service.SystemDictService;
@ -28,7 +29,7 @@ public class UploadChargeThread implements Runnable{
                {
                    Boolean running = systemDictService.getUploadChargeRunning();
                    if(running) {
//                        System.out.print(DateUtil.dateToStrLong(now) + " 上传缴费...\r\n");
                        System.out.print(DateUtil.dateToStrLong(now) + " 上传缴费...\r\n");
                        try {
                            service.uploadCharge();
                        }
@ -37,7 +38,7 @@ public class UploadChargeThread implements Runnable{
                            e.printStackTrace();
                        }
//                        System.out.print(DateUtil.dateToStrLong(new Date()) + " 上传缴费记录结束。\r\n");
                        System.out.print(DateUtil.dateToStrLong(new Date()) + " 上传缴费记录结束。\r\n");
                    }
                    Thread.sleep(sleepTime);

+ 188 - 3
patient-co-service/wlyy_sign/src/main/java/com/yihu/wlyy/sign/service/SignZYService.java

@ -15,6 +15,10 @@ import org.springframework.stereotype.Service;
import java.io.*;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
 * Created by hzp on 2016/9/9.
@ -348,6 +352,7 @@ public class SignZYService {
        try {
            if (map != null && map.containsKey("IDENTITY_CARD_NO")) {
                idcard = map.get("IDENTITY_CARD_NO");
//                System.out.println("开始同步基卫传递过来的签约数据(更新):居民身份证"+idcard);
                proId = map.get("PRO_ID");
                String chargeTime = map.get("CHARGE_TIME");//付款时间
                String flag = map.get("SUBSIDY_FLAG"); //是否补贴
@ -727,7 +732,6 @@ public class SignZYService {
        //签约变更记录表
        saveSignFamilyMapping(code, proId, signYear,idcard, action, isSuccess);
        return isSuccess;
    }
@ -1007,10 +1011,11 @@ public class SignZYService {
        String msg = "";
        try {
            
            if (map != null && map.containsKey("PRO_ID") && map.get("RELEASE_FLAG").equals("2"))
            {
                proId = map.get("PRO_ID");
//                System.out.println("开始同步基卫传递过来的解约数据:基卫pro_id"+proId);
                //通过proid获取code
                SignFamilyMapping mapping = signFamilyMappingDao.findByProId(proId);
                if(mapping==null || StringUtil.isEmpty(mapping.getCode()))
@ -1086,7 +1091,7 @@ public class SignZYService {
        //签约变更记录表
        saveSignFamilyMapping(null, proId, null, idcard, action, isSuccess);
//        System.out.println("开始同步基卫传递过来的解约数据:基卫pro_id"+proId+",同步成功");
        return isSuccess;
    }
@ -1238,6 +1243,39 @@ public class SignZYService {
        }while (end.before(new Date()));
    }
    
    
    /**
     * 【线程】调用批量更新签约数据
     * 多线程同步
     * @throws Exception
     */
    public void loadSignFamilyThreadMore() throws Exception
    {
        Date end = new Date();
        do {
            String startTime =  systemDictService.getLoadSignTime();   //上次执行时间
            Date start = DateUtil.getNextMin(DateUtil.strToDate(startTime), -1);   //重复采集前1分钟
            startTime = DateUtil.dateToStrLong(start);
            
            String endTime = DateUtil.dateToStrLong(new Date());
            
            System.out.println("采集时间:start="+startTime);
            
            end = DateUtil.getNextMin(DateUtil.strToDate(startTime), 15);//采集15分钟后的数据
            
            if(end.before(new Date())){
                endTime = DateUtil.dateToStrLong(end);
            }
            System.out.print(DateUtil.getCurrentString()+ " update sign family start..");
            loadSignFamilyBatchMore(startTime, endTime);
            
            //更新下次采集签约时间
            systemDictService.saveLoadSignTime(endTime);
            
        }while (end.before(new Date()));
    }
    
    /**
     * 【线程】调用批量新增签约数据
@ -1653,6 +1691,153 @@ public class SignZYService {
        }
        return isSuccess;
    }
    
    
    /**
     * 遍历机构更新签约数据   (更新)--多线程同步
     */
    public void loadSignFamilyBatchMore(String startTime,String endTime) throws Exception
    {
        List<HospitalMapping> list = hospitalMappingDao.findJwOrg();
        long total =  0;
        long error = 0;
        if(list!=null && list.size()>0) //获取机构映射
        {
            for(HospitalMapping hm :list)
            {
                try {
                    String hospital = hm.getMappingCode();
                    String licence = hm.getLicence();
                    //获取某个时间段签约数据
                    String response =  zysoftService.getEhrOrgFamilySignInfo(startTime, endTime, hospital, licence,"0");
                    List<Map<String,String>> signList = zysoftService.getJwList(response);
                    
                    long htotal =  0;
                    long herror = 0;
                    if(signList!=null && signList.size()>0)
                    {
                        htotal =  signList.size();
    
                        System.out.print("遍历机构更新签约数据   (更新)"+startTime +"~" + endTime +"【"+hm.getName()+"】总采集"+htotal+"条:开始执行同步"+DateUtil.getCurrentString()+"..\r\n");
                        
                        //数据小于10条的直接走单线程处理
                        if(10 >= htotal){
                            //遍历数据
                            for(Map<String,String> signInfo:signList)
                            {
                                //判断是否解约
                                if(signInfo.get("RELEASE_FLAG")!=null && signInfo.get("RELEASE_FLAG").equals("2"))
                                {
                                    if(!releaseSignFamily(signInfo))
                                    {
                                        herror ++;
                                    }
                                }
                                else{
                                    if(!saveSignFamily(signInfo))
                                    {
                                        herror ++;
                                    }
                                }
                            }
                            total += htotal;
                            error += herror;
                        }else {
                         //数据大于10条的走多线程
                            long start = System.currentTimeMillis();
                            // 总数据条数
                            int dataSize = signList.size();
                            // 每10条数据开启一条线程
                            int threadSize = 10;
                            // 线程数
                            int threadNum = dataSize / threadSize + 1;
                            // 定义标记,过滤threadNum为整数
                            boolean special = dataSize % threadSize == 0;
    
    
                            System.out.print("遍历机构更新签约数据   (更新):线程数"+threadNum+"..\r\n");
    
                            // 创建一个线程池
                            ExecutorService exec = Executors.newFixedThreadPool(threadNum);
    
                            // 定义一个任务集合
                            List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
                            Callable<Integer> task = null;
                            List<Map<String,String>> cutList = null;
                            // 确定每条线程的数据
                            for (int i = 0; i < threadNum; i++) {
                                if (i == threadNum - 1) {
                                    if (special) {
                                        break;
                                    }
                                    cutList = signList.subList(threadSize * i, dataSize);
                                } else {
                                    cutList = signList.subList(threadSize * i, threadSize * (i + 1));
                                }
                                // System.out.println("第" + (i + 1) + "组:" + cutList.toString());
                                final List<Map<String,String>> listStr = cutList;
                                task = new Callable<Integer>() {
                                    @Override
                                    public Integer call() throws Exception {
                                        if(listStr!=null && listStr.size()>0)
                                        {
//                                            htotal =  signList.size();
                                            //遍历数据
                                            for(Map<String,String> signInfo:listStr)
                                            {
                                                //判断是否解约
                                                if(signInfo.get("RELEASE_FLAG")!=null && signInfo.get("RELEASE_FLAG").equals("2"))
                                                {
                                                    if(!releaseSignFamily(signInfo))
                                                    {
                                                        System.out.print("遍历机构更新签约数据   (更新):解约失败..\r\n");
//                                                        herror ++;
                                                    }
                                                }
                                                else{
                                                    if(!saveSignFamily(signInfo))
                                                    {
                                                        System.out.print("遍历机构更新签约数据   (更新):签约失败..\r\n");
//                                                        herror ++;
                                                    }
                                                }
                                            }
//                                            total += htotal;
//                                            error += herror;
                                        }
//                                        System.out.println(Thread.currentThread().getName() + "线程:");
                                        return 1;
                                    }
                                };
                                // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
                                tasks.add(task);
                            }
                            List<Future<Integer>> results = exec.invokeAll(tasks);
                            for (Future<Integer> future : results) {
                                System.out.println(future.get());
                            }
                            // 关闭线程池
                            exec.shutdown();
                            System.out.println("线程任务执行结束");
                            System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
                         
                        }
                    }
                    System.out.print("遍历机构更新签约数据   (更新)"+startTime +"~" + endTime +"【"+hm.getName()+"】总采集"+htotal+"条,失败"+herror+"条。..\r\n");
                }
                catch (Exception e)
                {
                    System.out.print("遍历机构更新签约数据   (更新)"+startTime +"~" + endTime +"【"+hm.getName()+"】全部采集失败。..\r\n");
                }
            }
            
        }
        if(error != 0)
        {
            System.out.print("遍历机构更新签约数据   (更新)"+startTime +"~" + endTime +"批量更新签约数据,总条数"+total+"条,失败"+error+"条。..\r\n");
        }
    }
    /**

+ 3 - 3
patient-co/patient-co-wlyy/doc/技术文档/es/回访问卷新增索引.txt

@ -1,9 +1,9 @@
创建索引
POST  http://172.19.103.68:9200/wlyy_questionnaire_winning
POST  http://27.155.100.191:9200//wlyy_questionnaire_winning
查询索引
GET http://172.19.103.68:9200/wlyy_questionnaire_winning/
GET http://27.155.100.191:9200/wlyy_questionnaire_winning/
给索引加mapping
POST http://172.19.103.68:9200/wlyy_questionnaire_winning/wlyy_questionnaire_winning/_mapping
POST http://27.155.100.191:9200/wlyy_questionnaire_winning/wlyy_questionnaire_winning/_mapping
{
    "wlyy_questionnaire_winning": {
        "properties": {