|  | @ -1,5 +1,6 @@
 | 
	
		
			
				|  |  | package com.yihu.ehr.datacollect.service;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | import com.fasterxml.jackson.databind.JavaType;
 | 
	
		
			
				|  |  | import com.fasterxml.jackson.databind.ObjectMapper;
 | 
	
		
			
				|  |  | import com.yihu.ehr.common.Services;
 | 
	
		
			
				|  |  | import com.yihu.ehr.datacollect.dao.intf.IDatacollectDao;
 | 
	
	
		
			
				|  | @ -14,6 +15,8 @@ import com.yihu.ehr.dbhelper.jdbc.DBHelper;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.mongodb.MongodbHelper;
 | 
	
		
			
				|  |  | import com.yihu.ehr.framework.constrant.DateConvert;
 | 
	
		
			
				|  |  | import com.yihu.ehr.framework.model.ActionResult;
 | 
	
		
			
				|  |  | import com.yihu.ehr.framework.util.httpclient.HttpHelper;
 | 
	
		
			
				|  |  | import com.yihu.ehr.framework.util.httpclient.HttpResponse;
 | 
	
		
			
				|  |  | import com.yihu.ehr.framework.util.log.LogService;
 | 
	
		
			
				|  |  | import com.yihu.ehr.resource.service.IStdService;
 | 
	
		
			
				|  |  | import org.json.JSONObject;
 | 
	
	
		
			
				|  | @ -22,10 +25,7 @@ import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  | import org.springframework.transaction.annotation.Transactional;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | import javax.annotation.Resource;
 | 
	
		
			
				|  |  | import java.util.ArrayList;
 | 
	
		
			
				|  |  | import java.util.Date;
 | 
	
		
			
				|  |  | import java.util.List;
 | 
	
		
			
				|  |  | import java.util.Map;
 | 
	
		
			
				|  |  | import java.util.*;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | /**
 | 
	
		
			
				|  |  |  * 数据采集执行服务
 | 
	
	
		
			
				|  | @ -234,6 +234,41 @@ public class DatacollectService implements IDatacollectService {
 | 
	
		
			
				|  |  |         return "";
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 采集入库
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String intoMongodb(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
 | 
	
		
			
				|  |  |     {
 | 
	
		
			
				|  |  |         try{
 | 
	
		
			
				|  |  |             if(list!=null && list.size()>0)
 | 
	
		
			
				|  |  |             {
 | 
	
		
			
				|  |  |                 //字典转换
 | 
	
		
			
				|  |  |                 list = translateDict(list, colList,schemeVersion);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 //采集到mongodb
 | 
	
		
			
				|  |  |                 boolean b = mongo.insert(stdDatasetCode,list);
 | 
	
		
			
				|  |  |                 if(!b)
 | 
	
		
			
				|  |  |                 {
 | 
	
		
			
				|  |  |                     if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         System.out.print(mongo.errorMessage);
 | 
	
		
			
				|  |  |                         return mongo.errorMessage;
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     else {
 | 
	
		
			
				|  |  |                         return "Mongodb保存失败!(表:"+stdDatasetCode+")";
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         catch (Exception e)
 | 
	
		
			
				|  |  |         {
 | 
	
		
			
				|  |  |             return e.getMessage();
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return "";
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 数据库表采集
 | 
	
		
			
				|  |  |      * @return
 | 
	
	
		
			
				|  | @ -428,45 +463,197 @@ public class DatacollectService implements IDatacollectService {
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 采集入库
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      * webservice采集
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String intoMongodb(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
 | 
	
		
			
				|  |  |     private String collectWebservice(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
 | 
	
		
			
				|  |  |     {
 | 
	
		
			
				|  |  |         try{
 | 
	
		
			
				|  |  |             if(list!=null && list.size()>0)
 | 
	
		
			
				|  |  |             {
 | 
	
		
			
				|  |  |                 //字典转换
 | 
	
		
			
				|  |  |                 list = translateDict(list, colList,schemeVersion);
 | 
	
		
			
				|  |  |         String message = "";
 | 
	
		
			
				|  |  |         String datasetId = ds.getJobDatasetId();
 | 
	
		
			
				|  |  |         String jobDatasetName = ds.getJobDatasetName();
 | 
	
		
			
				|  |  |         String condition=ds.getJobDatasetCondition();
 | 
	
		
			
				|  |  |         String key=ds.getJobDatasetKey();
 | 
	
		
			
				|  |  |         String keytype=ds.getJobDatasetKeytype();
 | 
	
		
			
				|  |  |         String keyvalue=ds.getJobDatasetKeyvalue();
 | 
	
		
			
				|  |  |         String orgCode = ds.getOrgCode();
 | 
	
		
			
				|  |  |         String config = ds.getConfig();
 | 
	
		
			
				|  |  |         String datasourceId = ds.getDatasourceId();
 | 
	
		
			
				|  |  |         //webservice地址
 | 
	
		
			
				|  |  |         ObjectMapper objectMapper = new ObjectMapper();
 | 
	
		
			
				|  |  |         Map<String,String> mapConfig = objectMapper.readValue(config,Map.class);
 | 
	
		
			
				|  |  |         if(mapConfig.containsKey("protocol") && mapConfig.containsKey("url"))
 | 
	
		
			
				|  |  |         {
 | 
	
		
			
				|  |  |             String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
 | 
	
		
			
				|  |  |             //获取数据集映射
 | 
	
		
			
				|  |  |             List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
 | 
	
		
			
				|  |  |             JSONArray datasetList = new JSONArray(datasetString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 //采集到mongodb
 | 
	
		
			
				|  |  |                 boolean b = mongo.insert(stdDatasetCode,list);
 | 
	
		
			
				|  |  |                 if(!b)
 | 
	
		
			
				|  |  |                 {
 | 
	
		
			
				|  |  |                     if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
 | 
	
		
			
				|  |  |             if(datasetList!=null &&datasetList.length()>0)
 | 
	
		
			
				|  |  |             {
 | 
	
		
			
				|  |  |                 String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
 | 
	
		
			
				|  |  |                 String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
 | 
	
		
			
				|  |  |                 //获取数据集字段映射结构
 | 
	
		
			
				|  |  |                 List colString = stdService.getDatacolByScheme(schemeVersion,datasetId);
 | 
	
		
			
				|  |  |                 JSONArray colList = new JSONArray(colString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 if(colList!=null && colList.length()>0) {
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     Map<String,Object> params = new HashMap<>();
 | 
	
		
			
				|  |  |                     String where = "";
 | 
	
		
			
				|  |  |                     params.put("table",adapterTableName);//表名
 | 
	
		
			
				|  |  |                     String paramsStr = "";
 | 
	
		
			
				|  |  |                     //设置最大值
 | 
	
		
			
				|  |  |                     if(keyvalue!=null && keyvalue.length()>0)
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         System.out.print(mongo.errorMessage);
 | 
	
		
			
				|  |  |                         return mongo.errorMessage;
 | 
	
		
			
				|  |  |                         if(condition.length()>0 && !condition.equals("[]"))
 | 
	
		
			
				|  |  |                         {
 | 
	
		
			
				|  |  |                             paramsStr = condition.substring(0,condition.length()-1) + ",{\"andOr\":\" AND \",\"field\":\""+key+"\",\"condition\":\" > \",\"value\":\""+keyvalue+"\"}]";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                         else{
 | 
	
		
			
				|  |  |                             paramsStr = "[{\"andOr\":\" AND \",\"field\":\""+key+"\",\"condition\":\" > \",\"value\":\""+keyvalue+"\"}]";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     else {
 | 
	
		
			
				|  |  |                         return "Mongodb保存失败!(表:"+stdDatasetCode+")";
 | 
	
		
			
				|  |  |                     else{
 | 
	
		
			
				|  |  |                         paramsStr = condition;
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     params.put("params",paramsStr);
 | 
	
		
			
				|  |  |                     params.put("key",key);
 | 
	
		
			
				|  |  |                     params.put("keytype",keytype);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     //获取总条数和最大值
 | 
	
		
			
				|  |  |                     HttpResponse responseCount = HttpHelper.get(url,params);
 | 
	
		
			
				|  |  |                     Map<String,String> mapCount = objectMapper.readValue(responseCount.getBody(),Map.class);
 | 
	
		
			
				|  |  |                     if(responseCount.getStatusCode() == 200 && mapCount.containsKey("count"))
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         Integer count =  Integer.parseInt(mapCount.get("count"));
 | 
	
		
			
				|  |  |                         if(count == 0) //0条记录,无需采集
 | 
	
		
			
				|  |  |                         {
 | 
	
		
			
				|  |  |                             message = "0条记录,无需采集。";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                         else {
 | 
	
		
			
				|  |  |                             params.remove("key");
 | 
	
		
			
				|  |  |                             params.remove("keytype");
 | 
	
		
			
				|  |  |                             int successCount = 0;
 | 
	
		
			
				|  |  |                             String maxKeyvalue = mapCount.get("maxKeyvalue");
 | 
	
		
			
				|  |  |                             int countPage = 1;
 | 
	
		
			
				|  |  |                             if(count > maxNum) //分页采集
 | 
	
		
			
				|  |  |                             {
 | 
	
		
			
				|  |  |                                 countPage = count/maxNum+1;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                             for(int i=0;i<countPage;i++)
 | 
	
		
			
				|  |  |                             {
 | 
	
		
			
				|  |  |                                 int rows = maxNum;
 | 
	
		
			
				|  |  |                                 if(i+1==countPage){
 | 
	
		
			
				|  |  |                                     rows = count-i*maxNum;
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                                 int start = i*maxNum; //开始行数
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                 RsJobLogDetail detail = new RsJobLogDetail();
 | 
	
		
			
				|  |  |                                 detail.setStartTime(new Date());
 | 
	
		
			
				|  |  |                                 detail.setJobLogId(logId);
 | 
	
		
			
				|  |  |                                 detail.setDatasourceId(datasourceId);
 | 
	
		
			
				|  |  |                                 detail.setConfig(config);
 | 
	
		
			
				|  |  |                                 detail.setStdDatasetCode(stdTableName);
 | 
	
		
			
				|  |  |                                 detail.setJobDatasetId(datasetId);
 | 
	
		
			
				|  |  |                                 detail.setJobDatasetName(ds.getJobDatasetName());
 | 
	
		
			
				|  |  |                                 detail.setJobId(ds.getJobId());
 | 
	
		
			
				|  |  |                                 detail.setJobSql("{table:\""+adapterTableName+"\",where:\""+where+"\",start:\""+start+"\",rows:\""+rows+"\"}");
 | 
	
		
			
				|  |  |                                 detail.setJobNum(i+1);
 | 
	
		
			
				|  |  |                                 detail.setJobDatasetRows(rows);
 | 
	
		
			
				|  |  |                                 detail.setSchemeVersion(schemeVersion);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                 //分页获取数据
 | 
	
		
			
				|  |  |                                 params.put("start",start);
 | 
	
		
			
				|  |  |                                 params.put("rows",rows);
 | 
	
		
			
				|  |  |                                 HttpResponse responseList = HttpHelper.get(url,params);
 | 
	
		
			
				|  |  |                                 String msg = "";
 | 
	
		
			
				|  |  |                                 if(responseList.getStatusCode() == 200) {
 | 
	
		
			
				|  |  |                                     JavaType javaType = objectMapper.getTypeFactory().constructParametricType(List.class, JSONObject.class);
 | 
	
		
			
				|  |  |                                     List<JSONObject> list = objectMapper.readValue(responseList.getBody(), javaType);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                     if (list != null) {
 | 
	
		
			
				|  |  |                                         List<JSONObject> listNew = new ArrayList<>();
 | 
	
		
			
				|  |  |                                         //遍历所有数据,转译数据元代码
 | 
	
		
			
				|  |  |                                         for(JSONObject obj : list)
 | 
	
		
			
				|  |  |                                         {
 | 
	
		
			
				|  |  |                                             JSONObject objNew = new JSONObject();
 | 
	
		
			
				|  |  |                                             //转译字段名
 | 
	
		
			
				|  |  |                                             for(int j=0; j< colList.length();j++)
 | 
	
		
			
				|  |  |                                             {
 | 
	
		
			
				|  |  |                                                 JSONObject col = colList.getJSONObject(j);
 | 
	
		
			
				|  |  |                                                 String adapterMetadataCode = col.optString("adapterMetadataCode");
 | 
	
		
			
				|  |  |                                                 String stdMetadataCode = col.optString("stdMetadataCode");
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                                 if(obj.has(adapterMetadataCode))
 | 
	
		
			
				|  |  |                                                 {
 | 
	
		
			
				|  |  |                                                     objNew.put(stdMetadataCode,obj.get(adapterMetadataCode));
 | 
	
		
			
				|  |  |                                                 }
 | 
	
		
			
				|  |  |                                             }
 | 
	
		
			
				|  |  |                                             listNew.add(objNew);
 | 
	
		
			
				|  |  |                                         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                         msg = intoMongodb(listNew, schemeVersion, stdTableName, colList); //返回信息
 | 
	
		
			
				|  |  |                                     } else {
 | 
	
		
			
				|  |  |                                         msg = "查询数据为空!";
 | 
	
		
			
				|  |  |                                     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                                 else{
 | 
	
		
			
				|  |  |                                     msg = responseList.getBody();
 | 
	
		
			
				|  |  |                                     if(msg.length()==0)
 | 
	
		
			
				|  |  |                                     {
 | 
	
		
			
				|  |  |                                         msg = "获取数据失败!";
 | 
	
		
			
				|  |  |                                     }
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                                 if(msg.length()>0)
 | 
	
		
			
				|  |  |                                 {
 | 
	
		
			
				|  |  |                                     //任务日志细表异常操作
 | 
	
		
			
				|  |  |                                     detail.setJobStatus("0");
 | 
	
		
			
				|  |  |                                     detail.setJobContent(msg);
 | 
	
		
			
				|  |  |                                     LogService.getLogger().info(msg); //文本日志
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                                 else{
 | 
	
		
			
				|  |  |                                     detail.setJobStatus("1");
 | 
	
		
			
				|  |  |                                     detail.setJobContent("采集成功!");
 | 
	
		
			
				|  |  |                                     successCount += rows;
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                                 detail.setEndTime(new Date());
 | 
	
		
			
				|  |  |                                 datacollectLogDao.saveEntity(detail);
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             //修改最大值
 | 
	
		
			
				|  |  |                             if(maxKeyvalue!=null&& maxKeyvalue.length()>0)
 | 
	
		
			
				|  |  |                             {
 | 
	
		
			
				|  |  |                                 datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(),maxKeyvalue);
 | 
	
		
			
				|  |  |                                 LogService.getLogger().info("修改任务数据集最大值为"+maxKeyvalue+"。"); //文本日志
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                             message = jobDatasetName + "采集成功"+successCount+"条数据,总条数"+count+"条。";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     else{
 | 
	
		
			
				|  |  |                         throw new Exception(jobDatasetName + "获取总条数异常!"+responseCount.getBody());
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 else
 | 
	
		
			
				|  |  |                 {
 | 
	
		
			
				|  |  |                     throw new Exception(jobDatasetName + "数据集字段映射为空!");
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             else{
 | 
	
		
			
				|  |  |                 throw new Exception(jobDatasetName + "数据集映射为空!");
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         catch (Exception e)
 | 
	
		
			
				|  |  |         {
 | 
	
		
			
				|  |  |             return e.getMessage();
 | 
	
		
			
				|  |  |         else{
 | 
	
		
			
				|  |  |             throw new Exception("非法webservice路径!");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return "";
 | 
	
		
			
				|  |  |         LogService.getLogger().info(message);
 | 
	
		
			
				|  |  |         return message;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 执行任务
 | 
	
		
			
				|  |  |      */
 | 
	
	
		
			
				|  | @ -504,7 +691,7 @@ public class DatacollectService implements IDatacollectService {
 | 
	
		
			
				|  |  |                         if (type != null) {
 | 
	
		
			
				|  |  |                             if (type.equals("1")) //Web Service
 | 
	
		
			
				|  |  |                             {
 | 
	
		
			
				|  |  |                                 message = "Web Service采集。\n";
 | 
	
		
			
				|  |  |                                 message = collectWebservice(ds, schemeVersion, logId) + "\n";
 | 
	
		
			
				|  |  |                             } else if (type.equals("2"))//文件系统
 | 
	
		
			
				|  |  |                             {
 | 
	
		
			
				|  |  |                                 message = "文件系统采集。\n";
 |