|  | @ -0,0 +1,1166 @@
 | 
	
		
			
				|  |  | package com.yihu.hos.rest.services.crawler;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | import com.fasterxml.jackson.databind.ObjectMapper;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.common.QueryCondition;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.common.enums.DBType;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.common.sqlparser.ParserMysql;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.common.sqlparser.ParserOracle;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.common.sqlparser.ParserSql;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.common.sqlparser.ParserSqlserver;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.jdbc.DBHelper;
 | 
	
		
			
				|  |  | import com.yihu.ehr.dbhelper.mongodb.MongodbHelper;
 | 
	
		
			
				|  |  | import com.yihu.hos.core.log.Logger;
 | 
	
		
			
				|  |  | import com.yihu.hos.core.log.LoggerFactory;
 | 
	
		
			
				|  |  | import com.yihu.hos.rest.common.dao.DatacollectDao;
 | 
	
		
			
				|  |  | import com.yihu.hos.rest.common.dao.DatacollectLogDao;
 | 
	
		
			
				|  |  | import com.yihu.hos.rest.models.crawler.config.SysConfig;
 | 
	
		
			
				|  |  | import com.yihu.hos.rest.models.crawler.patient.PatientIdentity;
 | 
	
		
			
				|  |  | import com.yihu.hos.rest.models.rs.*;
 | 
	
		
			
				|  |  | import com.yihu.hos.rest.services.standard.StdService;
 | 
	
		
			
				|  |  | import com.yihu.hos.web.framework.constrant.DateConvert;
 | 
	
		
			
				|  |  | import com.yihu.hos.web.framework.constrant.SqlConstants;
 | 
	
		
			
				|  |  | import com.yihu.hos.web.framework.model.ActionResult;
 | 
	
		
			
				|  |  | import com.yihu.hos.web.framework.util.GridFSUtil;
 | 
	
		
			
				|  |  | import org.bson.types.ObjectId;
 | 
	
		
			
				|  |  | import org.dom4j.Document;
 | 
	
		
			
				|  |  | import org.dom4j.Element;
 | 
	
		
			
				|  |  | import org.dom4j.io.SAXReader;
 | 
	
		
			
				|  |  | import org.json.JSONArray;
 | 
	
		
			
				|  |  | import org.json.JSONObject;
 | 
	
		
			
				|  |  | import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  | import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  | import org.springframework.transaction.annotation.Transactional;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | import javax.annotation.Resource;
 | 
	
		
			
				|  |  | import java.io.ByteArrayInputStream;
 | 
	
		
			
				|  |  | import java.sql.Blob;
 | 
	
		
			
				|  |  | import java.text.ParseException;
 | 
	
		
			
				|  |  | import java.text.SimpleDateFormat;
 | 
	
		
			
				|  |  | import java.util.*;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | /**
 | 
	
		
			
				|  |  |  * 数据采集执行服务
 | 
	
		
			
				|  |  |  */
 | 
	
		
			
				|  |  | @Service("DatacollectService")
 | 
	
		
			
				|  |  | public class DatacollectService {
 | 
	
		
			
				|  |  |     static private final Logger logger = LoggerFactory.getLogger(DatacollectService.class);
 | 
	
		
			
				|  |  |     MongodbHelper mongoOrigin = new MongodbHelper("origin");
 | 
	
		
			
				|  |  |     MongodbHelper mongo = new MongodbHelper();
 | 
	
		
			
				|  |  |     String dateFormat = "yyyy-MM-dd HH:mm:ss"; //默认时间字符串格式
 | 
	
		
			
				|  |  |     int maxNum = 1000; //查询条数限制
 | 
	
		
			
				|  |  |     @Resource(name = "DatacollectManager")
 | 
	
		
			
				|  |  |     private DatacollectManager datacollect;
 | 
	
		
			
				|  |  |     @Resource(name = "StdService")
 | 
	
		
			
				|  |  |     private StdService stdService;
 | 
	
		
			
				|  |  |     @Resource(name = "DatacollectDao")
 | 
	
		
			
				|  |  |     private DatacollectDao datacollectDao;
 | 
	
		
			
				|  |  |     @Resource(name = "DatacollectLogDao")
 | 
	
		
			
				|  |  |     private DatacollectLogDao datacollectLogDao;
 | 
	
		
			
				|  |  |     @Autowired
 | 
	
		
			
				|  |  |     private ObjectMapper objectMapper;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 根据连接字符串获取数据库类型
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private static DBType getDbType(String uri) {
 | 
	
		
			
				|  |  |         return uri.startsWith("jdbc:mysql") ? DBType.Mysql : (uri.startsWith("jdbc:oracle") ? DBType.Oracle : (uri.startsWith("jdbc:hive2") ? DBType.Hive : (uri.startsWith("jdbc:microsoft:sqlserver") ? DBType.Sqlserver : DBType.Mysql)));
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 执行任务
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     public Boolean collectByJobId(String jobId) throws Exception {
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //获取任务详细信息
 | 
	
		
			
				|  |  |         RsJobConfig job = datacollect.getJobById(jobId);
 | 
	
		
			
				|  |  |         RsJobLog log = new RsJobLog();
 | 
	
		
			
				|  |  |         log.setJobId(jobId);
 | 
	
		
			
				|  |  |         log.setJobStartTime(new Date());
 | 
	
		
			
				|  |  |         datacollectLogDao.saveEntity(log);
 | 
	
		
			
				|  |  |         String logId = log.getId();
 | 
	
		
			
				|  |  |         logger.info("任务" + jobId + "开始采集,新增日志" + logId + "。");
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         StringBuilder logStr = new StringBuilder();
 | 
	
		
			
				|  |  |         int count = 0;
 | 
	
		
			
				|  |  |         int success = 0;
 | 
	
		
			
				|  |  |         Boolean flag = true;
 | 
	
		
			
				|  |  |         try {
 | 
	
		
			
				|  |  |             String schemeVersion = job.getSchemeVersion();
 | 
	
		
			
				|  |  |             //获取任务相关数据集
 | 
	
		
			
				|  |  |             List<DtoJobDataset> list = datacollectDao.getDatacollectDataset(jobId);
 | 
	
		
			
				|  |  |             logger.info("获取任务相关数据集,数量" + list.size() + "。");
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             if (list != null && list.size() > 0) {
 | 
	
		
			
				|  |  |                 count = list.size();
 | 
	
		
			
				|  |  |                 logStr.append("/*********** 开始采集 *******************/\n");
 | 
	
		
			
				|  |  |                 //遍历数据集
 | 
	
		
			
				|  |  |                 for (DtoJobDataset ds : list) {
 | 
	
		
			
				|  |  |                     try {
 | 
	
		
			
				|  |  |                         String type = ds.getType();
 | 
	
		
			
				|  |  |                         String message = "";
 | 
	
		
			
				|  |  |                         logStr.append(DateConvert.toString(new Date(), dateFormat) + " " + ds.getJobDatasetName());
 | 
	
		
			
				|  |  |                         if (type != null) {
 | 
	
		
			
				|  |  |                             if (type.equals("1")) //Web Service
 | 
	
		
			
				|  |  |                             {
 | 
	
		
			
				|  |  |                                 message = collectWebservice(ds, schemeVersion, logId) + "\n";
 | 
	
		
			
				|  |  |                             } else if (type.equals("2"))//文件系统
 | 
	
		
			
				|  |  |                             {
 | 
	
		
			
				|  |  |                                 message = "文件系统采集。\n";
 | 
	
		
			
				|  |  |                             } else { //数据库
 | 
	
		
			
				|  |  |                                 message = collectTable(ds, schemeVersion, logId) + "\n";
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                         } else {
 | 
	
		
			
				|  |  |                             message = ds.getJobDatasetName() + "未关联数据源!\n";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                         logger.info(message); //文本日志
 | 
	
		
			
				|  |  |                         logStr.append(message);
 | 
	
		
			
				|  |  |                         success++;
 | 
	
		
			
				|  |  |                     } catch (Exception ex) {
 | 
	
		
			
				|  |  |                         logger.info("异常:" + ex.getMessage());
 | 
	
		
			
				|  |  |                         logStr.append(ex.getMessage() + "\n");
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 logStr.append("/*********** 结束采集 *******************/\n");
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         } catch (Exception ex) {
 | 
	
		
			
				|  |  |             flag = false;
 | 
	
		
			
				|  |  |             ex.printStackTrace();
 | 
	
		
			
				|  |  |             logger.info("异常:" + ex.getMessage());
 | 
	
		
			
				|  |  |             logStr.append(ex.getMessage() + "\n");
 | 
	
		
			
				|  |  |             logStr.append("/*********** 出现异常,中断采集 *******************/\n");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //任务主日志成功
 | 
	
		
			
				|  |  |         String jobContent = logStr.toString().replace("\"", "\\\"");
 | 
	
		
			
				|  |  |         if (jobContent.length() > 4000) {
 | 
	
		
			
				|  |  |             jobContent = jobContent.substring(0, 4000);
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         log.setJobContent(jobContent);
 | 
	
		
			
				|  |  |         log.setJobEndTime(new Date());
 | 
	
		
			
				|  |  |         log.setJobDatasetCount(count);
 | 
	
		
			
				|  |  |         log.setJobDatasetSuccess(success);
 | 
	
		
			
				|  |  |         logger.info("任务结束," + count + "个数据集成功采集" + success + "个。");
 | 
	
		
			
				|  |  |         datacollectLogDao.updateEntity(log);
 | 
	
		
			
				|  |  |         return flag;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 根据日志详细补采数据
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     @Transactional
 | 
	
		
			
				|  |  |     public ActionResult repeatJob(String id) throws Exception {
 | 
	
		
			
				|  |  |         RsJobLogDetail log = datacollectLogDao.getEntity(RsJobLogDetail.class, id);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         if (log.getJobStatus().equals("2")) {
 | 
	
		
			
				|  |  |             return new ActionResult(false, "数据补采中!");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         if (!log.getJobStatus().equals("0")) {
 | 
	
		
			
				|  |  |             return new ActionResult(false, "数据无需补采!");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         try {
 | 
	
		
			
				|  |  |             log.setRepeatStartTime(new Date());
 | 
	
		
			
				|  |  |             log.setJobStatus("2"); //设置采集中状态
 | 
	
		
			
				|  |  |             datacollectLogDao.updateEntity(log);
 | 
	
		
			
				|  |  |         } catch (Exception e) {
 | 
	
		
			
				|  |  |             return new ActionResult(false, "补采失败!");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         log.setJobStatus("0");
 | 
	
		
			
				|  |  |         datacollectLogDao.updateEntity(log);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         String stdDatasetCode = log.getStdDatasetCode();
 | 
	
		
			
				|  |  |         String sql = log.getJobSql();
 | 
	
		
			
				|  |  |         //数据库连接
 | 
	
		
			
				|  |  |         String datasourceId = log.getDatasourceId();
 | 
	
		
			
				|  |  |         String config = log.getConfig();
 | 
	
		
			
				|  |  |         DBHelper db = new DBHelper(datasourceId, config);
 | 
	
		
			
				|  |  |         //获取数据集字段映射结构
 | 
	
		
			
				|  |  |         String schemeVersion = log.getSchemeVersion();
 | 
	
		
			
				|  |  |         String datasetId = log.getJobDatasetId();
 | 
	
		
			
				|  |  |         List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
 | 
	
		
			
				|  |  |         JSONArray colList = new JSONArray(colString);
 | 
	
		
			
				|  |  |         List<JSONObject> list = db.query(sql);
 | 
	
		
			
				|  |  |         String message = intoMongodb(list, schemeVersion, stdDatasetCode, colList);
 | 
	
		
			
				|  |  |         if (message.length() > 0 || db.errorMessage.length() > 0) {
 | 
	
		
			
				|  |  |             log.setJobStatus("0");
 | 
	
		
			
				|  |  |             log.setRepeatEndTime(new Date());
 | 
	
		
			
				|  |  |             if (message.length() > 0) {
 | 
	
		
			
				|  |  |                 log.setRepeatJobContent(message);
 | 
	
		
			
				|  |  |             } else {
 | 
	
		
			
				|  |  |                 db.errorMessage.length();
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             datacollectLogDao.updateEntity(log);
 | 
	
		
			
				|  |  |             return new ActionResult(false, "补采失败!");
 | 
	
		
			
				|  |  |         } else {
 | 
	
		
			
				|  |  |             log.setJobStatus("3");
 | 
	
		
			
				|  |  |             log.setRepeatEndTime(new Date());
 | 
	
		
			
				|  |  |             log.setRepeatJobContent("补采成功!");
 | 
	
		
			
				|  |  |             datacollectLogDao.updateEntity(log);
 | 
	
		
			
				|  |  |             return new ActionResult(true, "补采成功!");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 根据数据库类型获取时间sql
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String getDateSqlByDBType(DBType dbType, Date date) throws Exception {
 | 
	
		
			
				|  |  |         String val = DateConvert.toString(date, dateFormat);
 | 
	
		
			
				|  |  |         if (dbType.equals(DBType.Mysql)) {
 | 
	
		
			
				|  |  |             return "date_format(\'" + val + "\',\'" + dateFormat + "\')";
 | 
	
		
			
				|  |  |         } else if (dbType.equals(DBType.Oracle)) {
 | 
	
		
			
				|  |  |             return "to_date(\'" + val + "\',\'" + dateFormat + "\')";
 | 
	
		
			
				|  |  |         } else {
 | 
	
		
			
				|  |  |             return val;
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 根据数据库类型获取转换数值型sql
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String getToNumberSqlByDBType(DBType dbType, String key) throws Exception {
 | 
	
		
			
				|  |  |         if (dbType.equals(DBType.Mysql)) {
 | 
	
		
			
				|  |  |             return "cast(" + key + " as signed integer)";
 | 
	
		
			
				|  |  |         } else if (dbType.equals(DBType.Oracle)) {
 | 
	
		
			
				|  |  |             return "to_number(" + key + ")";
 | 
	
		
			
				|  |  |         } else {
 | 
	
		
			
				|  |  |             return key;
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 根据数据库类型获取分页sql
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String getPageSqlByDBType(DBType dbType, String sql, int start, int rows) throws Exception {
 | 
	
		
			
				|  |  |         if (dbType.equals(DBType.Mysql)) {
 | 
	
		
			
				|  |  |             return sql + " LIMIT " + start + "," + rows;
 | 
	
		
			
				|  |  |         } else if (dbType.equals(DBType.Oracle)) {
 | 
	
		
			
				|  |  |             return " select * from (select t.*,ROWNUM RSCOM_RN from (" + sql + ") t where ROWNUM<" + (start + rows + 1) + ") where RSCOM_RN>= " + (start + 1);
 | 
	
		
			
				|  |  |         } else {
 | 
	
		
			
				|  |  |             return sql;
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 字典全转换成中文
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private List<JSONObject> translateDictCN(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
 | 
	
		
			
				|  |  |         //获取字典列表
 | 
	
		
			
				|  |  |         List<DtoDictCol> dictColList = new ArrayList<>();
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         for (int i = 0; i < colList.length(); i++) {
 | 
	
		
			
				|  |  |             JSONObject col = colList.getJSONObject(i);
 | 
	
		
			
				|  |  |             String dictId = col.optString("adapterDictId");
 | 
	
		
			
				|  |  |             if (dictId != null && dictId.length() > 0) {
 | 
	
		
			
				|  |  |                 String dictType = col.optString("adapterDataType");
 | 
	
		
			
				|  |  |                 String stdMetadataCode = col.optString("stdMetadataCode");
 | 
	
		
			
				|  |  |                 DtoDictCol dictCol = new DtoDictCol();
 | 
	
		
			
				|  |  |                 dictCol.setStdMetadataCode(stdMetadataCode);
 | 
	
		
			
				|  |  |                 dictCol.setStdDictId(dictId);
 | 
	
		
			
				|  |  |                 dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
 | 
	
		
			
				|  |  |                 //获取字典数据
 | 
	
		
			
				|  |  |                 List dictString = stdService.getDictByScheme(schemeVersion, dictId);
 | 
	
		
			
				|  |  |                 JSONArray dictAdapterArray = new JSONArray(dictString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 dictCol.setDictList(dictAdapterArray);
 | 
	
		
			
				|  |  |                 dictColList.add(dictCol);
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //翻译列表
 | 
	
		
			
				|  |  |         for (JSONObject data : list) {
 | 
	
		
			
				|  |  |             //遍历字典字段
 | 
	
		
			
				|  |  |             for (DtoDictCol col : dictColList) {
 | 
	
		
			
				|  |  |                 String colNmae = col.getStdMetadataCode();
 | 
	
		
			
				|  |  |                 String oldValue = data.optString(colNmae);
 | 
	
		
			
				|  |  |                 String newValue = translateDictValueCN(oldValue, col.getAdapterDataType(), col.getDictList());
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 if (newValue != null && newValue.length() > 0) {
 | 
	
		
			
				|  |  |                     data.put(colNmae, newValue);
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return list;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 转译字典成中文
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String translateDictValueCN(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
 | 
	
		
			
				|  |  |         if (type.equals("0")) //原本就是值
 | 
	
		
			
				|  |  |         {
 | 
	
		
			
				|  |  |             return oldValue;
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //遍历字典数据(编码->名称)
 | 
	
		
			
				|  |  |         for (int i = 0; i < dictAdapterList.length(); i++) {
 | 
	
		
			
				|  |  |             JSONObject dictItem = dictAdapterList.getJSONObject(i);
 | 
	
		
			
				|  |  |             if (oldValue != null && dictItem.has("stdEntryCode")) {
 | 
	
		
			
				|  |  |                 if (oldValue.equals(dictItem.getString("stdEntryCode"))) {
 | 
	
		
			
				|  |  |                     String newValue = dictItem.getString("stdEntryValue"); //名称
 | 
	
		
			
				|  |  |                     return newValue;
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return oldValue;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 字典转换
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @param list
 | 
	
		
			
				|  |  |      * @param colList
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      * @throws Exception
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private List<JSONObject> translateDict(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
 | 
	
		
			
				|  |  |         //获取字典列表
 | 
	
		
			
				|  |  |         List<DtoDictCol> dictColList = new ArrayList<>();
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         for (int i = 0; i < colList.length(); i++) {
 | 
	
		
			
				|  |  |             JSONObject col = colList.getJSONObject(i);
 | 
	
		
			
				|  |  |             String dictId = col.optString("adapterDictId");
 | 
	
		
			
				|  |  |             if (dictId != null && dictId.length() > 0) {
 | 
	
		
			
				|  |  |                 String dictType = col.optString("adapterDataType");
 | 
	
		
			
				|  |  |                 String stdMetadataCode = col.optString("stdMetadataCode");
 | 
	
		
			
				|  |  |                 DtoDictCol dictCol = new DtoDictCol();
 | 
	
		
			
				|  |  |                 dictCol.setStdMetadataCode(stdMetadataCode);
 | 
	
		
			
				|  |  |                 dictCol.setStdDictId(dictId);
 | 
	
		
			
				|  |  |                 dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
 | 
	
		
			
				|  |  |                 //获取字典数据
 | 
	
		
			
				|  |  |                 List dictString = stdService.getDictByScheme(schemeVersion, dictId);
 | 
	
		
			
				|  |  |                 JSONArray dictAdapterArray = new JSONArray(dictString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 dictCol.setDictList(dictAdapterArray);
 | 
	
		
			
				|  |  |                 dictColList.add(dictCol);
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //翻译列表
 | 
	
		
			
				|  |  |         for (JSONObject data : list) {
 | 
	
		
			
				|  |  |             //遍历字典字段
 | 
	
		
			
				|  |  |             for (DtoDictCol col : dictColList) {
 | 
	
		
			
				|  |  |                 String colNmae = col.getStdMetadataCode();
 | 
	
		
			
				|  |  |                 String oldValue = data.optString(colNmae);
 | 
	
		
			
				|  |  |                 String newValue = translateDictValue(oldValue, col.getAdapterDataType(), col.getDictList());
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 if (newValue != null && newValue.length() > 0) {
 | 
	
		
			
				|  |  |                     data.put(colNmae, newValue);
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return list;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 转译字典
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String translateDictValue(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
 | 
	
		
			
				|  |  |         //应用标准字段
 | 
	
		
			
				|  |  |         String colName = "adapterEntryCode";
 | 
	
		
			
				|  |  |         if (type.equals("0")) //通过name转译
 | 
	
		
			
				|  |  |         {
 | 
	
		
			
				|  |  |             colName = "adapterEntryValue";
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //遍历字典数据
 | 
	
		
			
				|  |  |         for (int i = 0; i < dictAdapterList.length(); i++) {
 | 
	
		
			
				|  |  |             JSONObject dictItem = dictAdapterList.getJSONObject(i);
 | 
	
		
			
				|  |  |             if (oldValue != null && dictItem.has(colName)) {
 | 
	
		
			
				|  |  |                 if (oldValue.equals(dictItem.getString(colName))) {
 | 
	
		
			
				|  |  |                     String newValue = dictItem.getString("stdEntryCode");
 | 
	
		
			
				|  |  |                     return newValue;
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //找不到适配字典数据则返回空
 | 
	
		
			
				|  |  |         return "";
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 获取过滤条件
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String getCondition(DBType dbType, String conditionString) {
 | 
	
		
			
				|  |  |         JSONArray array = new JSONArray(conditionString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         if (array != null && array.length() > 0) {
 | 
	
		
			
				|  |  |             List<QueryCondition> conditions = new ArrayList<>();
 | 
	
		
			
				|  |  |             for (Object item : array) {
 | 
	
		
			
				|  |  |                 JSONObject obj = (JSONObject) item;
 | 
	
		
			
				|  |  |                 String logical = obj.getString("andOr");
 | 
	
		
			
				|  |  |                 String operation = obj.getString("condition");
 | 
	
		
			
				|  |  |                 String field = obj.getString("field");
 | 
	
		
			
				|  |  |                 String keyword = obj.getString("value");
 | 
	
		
			
				|  |  |                 conditions.add(new QueryCondition(logical, operation, field, keyword));
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             //条件语句转换
 | 
	
		
			
				|  |  |             ParserSql ps;
 | 
	
		
			
				|  |  |             switch (dbType) {
 | 
	
		
			
				|  |  |                 case Oracle:
 | 
	
		
			
				|  |  |                     ps = new ParserOracle();
 | 
	
		
			
				|  |  |                     break;
 | 
	
		
			
				|  |  |                 case Sqlserver:
 | 
	
		
			
				|  |  |                     ps = new ParserSqlserver();
 | 
	
		
			
				|  |  |                     break;
 | 
	
		
			
				|  |  |                 default:
 | 
	
		
			
				|  |  |                     ps = new ParserMysql();
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             return ps.getConditionSql(conditions);
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         return "";
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 获取条件SQL
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @param dbType
 | 
	
		
			
				|  |  |      * @param conditionString
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      * @throws ParseException
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String getConditionSql(DBType dbType, String conditionString) throws ParseException {
 | 
	
		
			
				|  |  |         String conditionSql = "";
 | 
	
		
			
				|  |  |         JSONArray conditions = new JSONArray(conditionString);
 | 
	
		
			
				|  |  |         Iterator iterator = conditions.iterator();
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         while (iterator.hasNext()) {
 | 
	
		
			
				|  |  |             JSONObject condition = (JSONObject) iterator.next();
 | 
	
		
			
				|  |  |             String logic = condition.getString("condition");
 | 
	
		
			
				|  |  |             String andOr = condition.getString("andOr");
 | 
	
		
			
				|  |  |             String field = condition.getString("field");
 | 
	
		
			
				|  |  |             String value = condition.getString("value");
 | 
	
		
			
				|  |  |             String fieldType = condition.getString("type");
 | 
	
		
			
				|  |  |             String keys = "";
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             if (andOr.equals(" AND ")) {
 | 
	
		
			
				|  |  |                 conditionSql = conditionSql + " and ";
 | 
	
		
			
				|  |  |             } else {
 | 
	
		
			
				|  |  |                 conditionSql = conditionSql + " or ";
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             if (logic.equals(" IN ") || logic.equals(" NOT IN ")) {
 | 
	
		
			
				|  |  |                 String[] keywords = value.split(",");
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 for (String key : keywords) {
 | 
	
		
			
				|  |  |                     keys += "'" + key + "',";
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 keys = " (" + keys.substring(0, keys.length() - 1) + ") ";
 | 
	
		
			
				|  |  |             } else if (logic.equals(" LIKE ")) {
 | 
	
		
			
				|  |  |                 keys += " '%" + value + "%' ";
 | 
	
		
			
				|  |  |             } else {
 | 
	
		
			
				|  |  |                 if (fieldType.equals("DATE")) {
 | 
	
		
			
				|  |  |                     keys += getDateFormatSql(dbType, value);
 | 
	
		
			
				|  |  |                 } else {
 | 
	
		
			
				|  |  |                     keys += " '" + value + "' ";
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             conditionSql += field + logic + keys;
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return conditionSql;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 获取对应数据库时间格式
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @param dbType
 | 
	
		
			
				|  |  |      * @param key
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      * @throws ParseException
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String getDateFormatSql(DBType dbType, String key) throws ParseException {
 | 
	
		
			
				|  |  |         String dateFormat = "yyyy-MM-dd HH:mm:ss";
 | 
	
		
			
				|  |  |         SimpleDateFormat formatDate = new SimpleDateFormat("yyyy-MM-dd");
 | 
	
		
			
				|  |  |         Date d = formatDate.parse(key);
 | 
	
		
			
				|  |  |         SimpleDateFormat format = new SimpleDateFormat(dateFormat);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         switch (dbType) {
 | 
	
		
			
				|  |  |             case Oracle:
 | 
	
		
			
				|  |  |                 key = "to_date(\'" + format.format(d) + "\',\'YYYY-MM-DD HH24:MI:SS\')";
 | 
	
		
			
				|  |  |                 break;
 | 
	
		
			
				|  |  |             case Sqlserver:
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 break;
 | 
	
		
			
				|  |  |             default:
 | 
	
		
			
				|  |  |                 key = "date_format(\'" + format.format(d) + "\',\'%y-%m-%d %T\')";
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return key;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 采集入库
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String intoMongodb(List<JSONObject> list, String schemeVersion, String stdDatasetCode, JSONArray colList) {
 | 
	
		
			
				|  |  |         String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
 | 
	
		
			
				|  |  |         String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
 | 
	
		
			
				|  |  |         PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
 | 
	
		
			
				|  |  |         if (patientIdentity != null) {
 | 
	
		
			
				|  |  |             patientIdCode = patientIdentity.getPatientIDCode();
 | 
	
		
			
				|  |  |             eventNoCode = patientIdentity.getEventNoCode();
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         try {
 | 
	
		
			
				|  |  |             if (!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
 | 
	
		
			
				|  |  |                 return "Mongodb索引创建失败!(表:" + stdDatasetCode + ")";
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             if (list != null && list.size() > 0) {
 | 
	
		
			
				|  |  |                 //字典未转换前采集到原始库
 | 
	
		
			
				|  |  |                 boolean b = mongoOrigin.insert(stdDatasetCode, translateDictCN(list, colList, schemeVersion));
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 //字典转换
 | 
	
		
			
				|  |  |                 list = translateDict(list, colList, schemeVersion);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 //采集到mongodb
 | 
	
		
			
				|  |  |                 b = mongo.insert(stdDatasetCode, list);
 | 
	
		
			
				|  |  |                 if (!b) {
 | 
	
		
			
				|  |  |                     if (mongo.errorMessage != null && mongo.errorMessage.length() > 0) {
 | 
	
		
			
				|  |  |                         logger.debug(mongo.errorMessage);
 | 
	
		
			
				|  |  |                         return mongo.errorMessage;
 | 
	
		
			
				|  |  |                     } else {
 | 
	
		
			
				|  |  |                         return "Mongodb保存失败!(表:" + stdDatasetCode + ")";
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         } catch (Exception e) {
 | 
	
		
			
				|  |  |             return e.getMessage();
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return "";
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 数据库表采集
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String collectTable(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
 | 
	
		
			
				|  |  |         String message = "";
 | 
	
		
			
				|  |  |         String datasetId = ds.getJobDatasetId();
 | 
	
		
			
				|  |  |         String jobDatasetName = ds.getJobDatasetName();
 | 
	
		
			
				|  |  |         String condition = ds.getJobDatasetCondition();
 | 
	
		
			
				|  |  |         String key = ds.getJobDatasetKey();
 | 
	
		
			
				|  |  |         String keytype = ds.getJobDatasetKeytype();
 | 
	
		
			
				|  |  |         String keyvalue = ds.getJobDatasetKeyvalue();
 | 
	
		
			
				|  |  |         String orgCode = ds.getOrgCode();
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         String datasourceId = ds.getDatasourceId();
 | 
	
		
			
				|  |  |         String config = ds.getConfig(); //数据库连接
 | 
	
		
			
				|  |  |         DBHelper db = new DBHelper(datasourceId, config);
 | 
	
		
			
				|  |  |         DBType dbType = db.dbType;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //获取数据集映射
 | 
	
		
			
				|  |  |         List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
 | 
	
		
			
				|  |  |         JSONArray datasetList = new JSONArray(datasetString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         if (datasetList != null && datasetList.length() > 0) {
 | 
	
		
			
				|  |  |             String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
 | 
	
		
			
				|  |  |             String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
 | 
	
		
			
				|  |  |             //获取数据集字段映射结构
 | 
	
		
			
				|  |  |             List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
 | 
	
		
			
				|  |  |             JSONArray colList = new JSONArray(colString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             if (colList != null && colList.length() > 0) {
 | 
	
		
			
				|  |  |                 //拼接查询sql
 | 
	
		
			
				|  |  |                 String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
 | 
	
		
			
				|  |  |                 for (int i = 0; i < colList.length(); i++) {
 | 
	
		
			
				|  |  |                     JSONObject col = colList.getJSONObject(i);
 | 
	
		
			
				|  |  |                     String adapterMetadataCode = col.optString("adapterMetadataCode");
 | 
	
		
			
				|  |  |                     if (adapterMetadataCode.length() > 0) {
 | 
	
		
			
				|  |  |                         strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 strSql += " from " + adapterTableName;
 | 
	
		
			
				|  |  |                 String strWhere = " where 1=1";
 | 
	
		
			
				|  |  |                 //采集范围
 | 
	
		
			
				|  |  |                 if (condition != null && condition.length() > 0) {
 | 
	
		
			
				|  |  |                     strWhere += getConditionSql(dbType, condition);
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 //增量采集
 | 
	
		
			
				|  |  |                 String maxKey = "0";
 | 
	
		
			
				|  |  |                 if (key != null && key.length() > 0) {
 | 
	
		
			
				|  |  |                     maxKey = key;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     if (keytype.toUpperCase().equals("DATE")) //时间类型
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                             Date keyDate = new Date();
 | 
	
		
			
				|  |  |                             //字符串转时间
 | 
	
		
			
				|  |  |                             keyDate = DateConvert.toDate(keyvalue);
 | 
	
		
			
				|  |  |                             //根据数据库类型获取时间sql
 | 
	
		
			
				|  |  |                             strWhere += " and " + maxKey + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         maxKey = getToNumberSqlByDBType(dbType, key);
 | 
	
		
			
				|  |  |                         if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                             strWhere += " and " + maxKey + ">'" + keyvalue + "'";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     } else {
 | 
	
		
			
				|  |  |                         if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                             strWhere += " and " + maxKey + ">'" + keyvalue + "'";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     strWhere += " order by " + maxKey;
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 strSql += strWhere;
 | 
	
		
			
				|  |  |                 //总条数
 | 
	
		
			
				|  |  |                 String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
 | 
	
		
			
				|  |  |                 String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
 | 
	
		
			
				|  |  |                 JSONObject objCount = db.load(sqlCount);
 | 
	
		
			
				|  |  |                 if (objCount == null) {
 | 
	
		
			
				|  |  |                     if (db.errorMessage.length() > 0) {
 | 
	
		
			
				|  |  |                         throw new Exception(db.errorMessage);
 | 
	
		
			
				|  |  |                     } else {
 | 
	
		
			
				|  |  |                         throw new Exception("查询异常:" + sqlCount);
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 } else {
 | 
	
		
			
				|  |  |                     int count = objCount.getInt("COUNT");
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     if (count == 0) //0条记录,无需采集
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         message = "0条记录,无需采集。";
 | 
	
		
			
				|  |  |                     } else {
 | 
	
		
			
				|  |  |                         //获取最大值
 | 
	
		
			
				|  |  |                         JSONObject objMax = db.load(sqlMax);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                         int successCount = 0;
 | 
	
		
			
				|  |  |                         String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
 | 
	
		
			
				|  |  |                         //修改最大值
 | 
	
		
			
				|  |  |                         if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                             datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
 | 
	
		
			
				|  |  |                             logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                         int countPage = 1;
 | 
	
		
			
				|  |  |                         if (count > maxNum) //分页采集
 | 
	
		
			
				|  |  |                         {
 | 
	
		
			
				|  |  |                             countPage = count / maxNum + 1;
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                         for (int i = 0; i < countPage; i++) {
 | 
	
		
			
				|  |  |                             int rows = maxNum;
 | 
	
		
			
				|  |  |                             if (i + 1 == countPage) {
 | 
	
		
			
				|  |  |                                 rows = count - i * maxNum;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                             String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             RsJobLogDetail detail = new RsJobLogDetail();
 | 
	
		
			
				|  |  |                             detail.setStartTime(new Date());
 | 
	
		
			
				|  |  |                             detail.setJobLogId(logId);
 | 
	
		
			
				|  |  |                             detail.setDatasourceId(datasourceId);
 | 
	
		
			
				|  |  |                             detail.setConfig(config);
 | 
	
		
			
				|  |  |                             detail.setStdDatasetCode(stdTableName);
 | 
	
		
			
				|  |  |                             detail.setJobDatasetId(datasetId);
 | 
	
		
			
				|  |  |                             detail.setJobDatasetName(ds.getJobDatasetName());
 | 
	
		
			
				|  |  |                             detail.setJobId(ds.getJobId());
 | 
	
		
			
				|  |  |                             detail.setJobSql(sql);
 | 
	
		
			
				|  |  |                             detail.setJobNum(i + 1);
 | 
	
		
			
				|  |  |                             detail.setJobDatasetRows(rows);
 | 
	
		
			
				|  |  |                             detail.setSchemeVersion(schemeVersion);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             List<JSONObject> list = db.query(sql);
 | 
	
		
			
				|  |  |                             String msg = "";
 | 
	
		
			
				|  |  |                             if (list != null) {
 | 
	
		
			
				|  |  |                                 msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
 | 
	
		
			
				|  |  |                             } else {
 | 
	
		
			
				|  |  |                                 if (db.errorMessage.length() > 0) {
 | 
	
		
			
				|  |  |                                     msg = db.errorMessage;
 | 
	
		
			
				|  |  |                                 } else {
 | 
	
		
			
				|  |  |                                     msg = "查询数据为空!";
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             if (msg.length() > 0) {
 | 
	
		
			
				|  |  |                                 //任务日志细表异常操作
 | 
	
		
			
				|  |  |                                 detail.setJobStatus("0");
 | 
	
		
			
				|  |  |                                 detail.setJobContent(msg);
 | 
	
		
			
				|  |  |                                 logger.info(msg); //文本日志
 | 
	
		
			
				|  |  |                             } else {
 | 
	
		
			
				|  |  |                                 detail.setJobStatus("1");
 | 
	
		
			
				|  |  |                                 detail.setJobContent("采集成功!");
 | 
	
		
			
				|  |  |                                 successCount += rows;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                             detail.setEndTime(new Date());
 | 
	
		
			
				|  |  |                             datacollectLogDao.saveEntity(detail);
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                         message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             } else {
 | 
	
		
			
				|  |  |                 throw new Exception(jobDatasetName + "数据集字段映射为空!");
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         } else {
 | 
	
		
			
				|  |  |             throw new Exception(jobDatasetName + "数据集映射为空!");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         logger.info(message);
 | 
	
		
			
				|  |  |         return message;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * XML转JSONList
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private List<JSONObject> getListFromXml(String xml) throws Exception {
 | 
	
		
			
				|  |  |         SAXReader reader = new SAXReader();
 | 
	
		
			
				|  |  |         Document doc = reader.read(new ByteArrayInputStream(xml.getBytes("UTF-8")));
 | 
	
		
			
				|  |  |         Element root = doc.getRootElement();
 | 
	
		
			
				|  |  |         List<JSONObject> re = new ArrayList<>();
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //xml数据列表
 | 
	
		
			
				|  |  |         Iterator iter = root.elementIterator("Data");
 | 
	
		
			
				|  |  |         while (iter.hasNext()) {
 | 
	
		
			
				|  |  |             JSONObject obj = new JSONObject();
 | 
	
		
			
				|  |  |             Element el = (Element) iter.next();
 | 
	
		
			
				|  |  |             Iterator cols = el.elementIterator();
 | 
	
		
			
				|  |  |             while (cols.hasNext()) {
 | 
	
		
			
				|  |  |                 Element col = (Element) cols.next();
 | 
	
		
			
				|  |  |                 obj.put(col.getName().toUpperCase(), col.getStringValue());
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             re.add(obj);
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return re;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * webservice采集
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String collectWebservice(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
 | 
	
		
			
				|  |  |         String message = "";
 | 
	
		
			
				|  |  |         String datasetId = ds.getJobDatasetId();
 | 
	
		
			
				|  |  |         String jobDatasetName = ds.getJobDatasetName();
 | 
	
		
			
				|  |  |         String condition = ds.getJobDatasetCondition();
 | 
	
		
			
				|  |  |         String key = ds.getJobDatasetKey();
 | 
	
		
			
				|  |  |         String keytype = ds.getJobDatasetKeytype();
 | 
	
		
			
				|  |  |         String keyvalue = ds.getJobDatasetKeyvalue();
 | 
	
		
			
				|  |  |         String orgCode = ds.getOrgCode();
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         String datasourceId = ds.getDatasourceId();
 | 
	
		
			
				|  |  |         String config = ds.getConfig(); //数据库连接
 | 
	
		
			
				|  |  |         DBType dbType = DBType.Oracle;//********** 先定死Oracle ****************************
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //webservice地址
 | 
	
		
			
				|  |  |         Map<String, String> mapConfig = objectMapper.readValue(config, Map.class);
 | 
	
		
			
				|  |  |         if (mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) {
 | 
	
		
			
				|  |  |             String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
 | 
	
		
			
				|  |  |             //获取数据集映射
 | 
	
		
			
				|  |  |             List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
 | 
	
		
			
				|  |  |             JSONArray datasetList = new JSONArray(datasetString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             if (datasetList != null && datasetList.length() > 0) {
 | 
	
		
			
				|  |  |                 String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
 | 
	
		
			
				|  |  |                 String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
 | 
	
		
			
				|  |  |                 //获取数据集字段映射结构
 | 
	
		
			
				|  |  |                 List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
 | 
	
		
			
				|  |  |                 JSONArray colList = new JSONArray(colString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 if (colList != null && colList.length() > 0) {
 | 
	
		
			
				|  |  |                     //拼接查询sql
 | 
	
		
			
				|  |  |                     String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
 | 
	
		
			
				|  |  |                     for (int i = 0; i < colList.length(); i++) {
 | 
	
		
			
				|  |  |                         JSONObject col = colList.getJSONObject(i);
 | 
	
		
			
				|  |  |                         String adapterMetadataCode = col.optString("adapterMetadataCode");
 | 
	
		
			
				|  |  |                         if (adapterMetadataCode.length() > 0) {
 | 
	
		
			
				|  |  |                             strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     strSql += " from " + adapterTableName;
 | 
	
		
			
				|  |  |                     String strWhere = " where 1=1";
 | 
	
		
			
				|  |  |                     //采集范围
 | 
	
		
			
				|  |  |                     if (condition != null && condition.length() > 0) {
 | 
	
		
			
				|  |  |                         strWhere += getConditionSql(dbType, condition);
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     //增量采集
 | 
	
		
			
				|  |  |                     String maxKey = "0";
 | 
	
		
			
				|  |  |                     String keyValue = ds.getJobDatasetKeyvalue();
 | 
	
		
			
				|  |  |                     if (key != null && key.length() > 0) {
 | 
	
		
			
				|  |  |                         maxKey = key;
 | 
	
		
			
				|  |  |                         if (keytype.toUpperCase().equals("DATE")) //时间类型
 | 
	
		
			
				|  |  |                         {
 | 
	
		
			
				|  |  |                             Date keyDate = new Date();
 | 
	
		
			
				|  |  |                             if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                                 //字符串转时间
 | 
	
		
			
				|  |  |                                 keyDate = DateConvert.toDate(keyvalue);
 | 
	
		
			
				|  |  |                                 //根据数据库类型获取时间sql
 | 
	
		
			
				|  |  |                                 strWhere += " and " + key + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
 | 
	
		
			
				|  |  |                                 strWhere += " order by " + key;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                         } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
 | 
	
		
			
				|  |  |                         {
 | 
	
		
			
				|  |  |                             maxKey = getToNumberSqlByDBType(dbType, key);
 | 
	
		
			
				|  |  |                             if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                                 strWhere += " and " + maxKey + ">'" + keyvalue + "'";
 | 
	
		
			
				|  |  |                                 strWhere += " order by " + maxKey;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                         } else {
 | 
	
		
			
				|  |  |                             if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                                 strWhere += " and " + key + ">'" + keyvalue + "'";
 | 
	
		
			
				|  |  |                                 strWhere += " order by " + key;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     strSql += strWhere;
 | 
	
		
			
				|  |  |                     //总条数和最大值查询
 | 
	
		
			
				|  |  |                     String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
 | 
	
		
			
				|  |  |                     String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     //webservice获取数据总条数
 | 
	
		
			
				|  |  |                     String strCount = "";//WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlCount});
 | 
	
		
			
				|  |  |                     List<JSONObject> dataCount = getListFromXml(strCount);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     if (dataCount != null && dataCount.size() > 0) {
 | 
	
		
			
				|  |  |                         Integer count = Integer.parseInt(dataCount.get(0).getString("COUNT"));
 | 
	
		
			
				|  |  |                         if (count == 0) //0条记录,无需采集
 | 
	
		
			
				|  |  |                         {
 | 
	
		
			
				|  |  |                             message = "0条记录,无需采集。";
 | 
	
		
			
				|  |  |                         } else {
 | 
	
		
			
				|  |  |                             //webservice获取最大值
 | 
	
		
			
				|  |  |                             String strMax = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlMax});
 | 
	
		
			
				|  |  |                             List<JSONObject> dataMax = getListFromXml(strCount);
 | 
	
		
			
				|  |  |                             int successCount = 0;
 | 
	
		
			
				|  |  |                             String maxKeyvalue = dataMax.get(0).getString("MAX_KEYVALUE");
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             //修改最大值
 | 
	
		
			
				|  |  |                             if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                                 datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
 | 
	
		
			
				|  |  |                                 logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                             int countPage = 1;
 | 
	
		
			
				|  |  |                             if (count > maxNum) //分页采集
 | 
	
		
			
				|  |  |                             {
 | 
	
		
			
				|  |  |                                 countPage = count / maxNum + 1;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                             for (int i = 0; i < countPage; i++) {
 | 
	
		
			
				|  |  |                                 int rows = maxNum;
 | 
	
		
			
				|  |  |                                 if (i + 1 == countPage) {
 | 
	
		
			
				|  |  |                                     rows = count - i * maxNum;
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                                 String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                 RsJobLogDetail detail = new RsJobLogDetail();
 | 
	
		
			
				|  |  |                                 detail.setStartTime(new Date());
 | 
	
		
			
				|  |  |                                 detail.setJobLogId(logId);
 | 
	
		
			
				|  |  |                                 detail.setDatasourceId(datasourceId);
 | 
	
		
			
				|  |  |                                 detail.setConfig(config);
 | 
	
		
			
				|  |  |                                 detail.setStdDatasetCode(stdTableName);
 | 
	
		
			
				|  |  |                                 detail.setJobDatasetId(datasetId);
 | 
	
		
			
				|  |  |                                 detail.setJobDatasetName(ds.getJobDatasetName());
 | 
	
		
			
				|  |  |                                 detail.setJobId(ds.getJobId());
 | 
	
		
			
				|  |  |                                 detail.setJobSql(sql);
 | 
	
		
			
				|  |  |                                 detail.setJobNum(i + 1);
 | 
	
		
			
				|  |  |                                 detail.setJobDatasetRows(rows);
 | 
	
		
			
				|  |  |                                 detail.setSchemeVersion(schemeVersion);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                 String msg = "";
 | 
	
		
			
				|  |  |                                 try {
 | 
	
		
			
				|  |  |                                     //获取分页数据
 | 
	
		
			
				|  |  |                                     String strList = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sql});
 | 
	
		
			
				|  |  |                                     List<JSONObject> list = getListFromXml(strList);
 | 
	
		
			
				|  |  |                                     if (list != null) {
 | 
	
		
			
				|  |  |                                         msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
 | 
	
		
			
				|  |  |                                     } else {
 | 
	
		
			
				|  |  |                                         msg = "查询数据为空!";
 | 
	
		
			
				|  |  |                                     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                                     if (msg.length() > 0) {
 | 
	
		
			
				|  |  |                                         //任务日志细表异常操作
 | 
	
		
			
				|  |  |                                         detail.setJobStatus("0");
 | 
	
		
			
				|  |  |                                         detail.setJobContent(msg);
 | 
	
		
			
				|  |  |                                         logger.info(msg); //文本日志
 | 
	
		
			
				|  |  |                                     } else {
 | 
	
		
			
				|  |  |                                         detail.setJobStatus("1");
 | 
	
		
			
				|  |  |                                         detail.setJobContent("采集成功!");
 | 
	
		
			
				|  |  |                                         successCount += rows;
 | 
	
		
			
				|  |  |                                     }
 | 
	
		
			
				|  |  |                                 } catch (Exception ex) {
 | 
	
		
			
				|  |  |                                     msg = ex.getMessage();
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                                 detail.setEndTime(new Date());
 | 
	
		
			
				|  |  |                                 datacollectLogDao.saveEntity(detail);
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 } else {
 | 
	
		
			
				|  |  |                     throw new Exception(jobDatasetName + "数据集字段映射为空!");
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             } else {
 | 
	
		
			
				|  |  |                 throw new Exception(jobDatasetName + "数据集映射为空!");
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         } else {
 | 
	
		
			
				|  |  |             throw new Exception("非法webservice路径!");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         logger.info(message);
 | 
	
		
			
				|  |  |         return message;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 采集入库(包含blob字段处理)
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String intoMongodb2(List<JSONObject> list, String schemeVersion, String stdDatasetCode, JSONArray colList) {
 | 
	
		
			
				|  |  |         String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
 | 
	
		
			
				|  |  |         String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
 | 
	
		
			
				|  |  |         PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
 | 
	
		
			
				|  |  |         if (patientIdentity != null) {
 | 
	
		
			
				|  |  |             patientIdCode = patientIdentity.getPatientIDCode();
 | 
	
		
			
				|  |  |             eventNoCode = patientIdentity.getEventNoCode();
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         try {
 | 
	
		
			
				|  |  |             if (!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
 | 
	
		
			
				|  |  |                 return "Mongodb索引创建失败!(表:" + stdDatasetCode + ")";
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             if (list != null && list.size() > 0) {
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 //TODO TOSET 判断是否是非结构化数据集
 | 
	
		
			
				|  |  |                 if ("unstructured".equals(stdDatasetCode)) {
 | 
	
		
			
				|  |  |                     for (JSONObject jsonObject : list) {
 | 
	
		
			
				|  |  |                         //文件内容保存到GridFS,细表内容字段保存为文件objctId
 | 
	
		
			
				|  |  |                         Blob blob = (Blob) jsonObject.get("CONTENT");
 | 
	
		
			
				|  |  |                         String type = (String) jsonObject.get("FILE_TYPE");
 | 
	
		
			
				|  |  |                         String patientId = (String) jsonObject.get("patient_id");
 | 
	
		
			
				|  |  |                         String eventNo = (String) jsonObject.get("event_no");
 | 
	
		
			
				|  |  |                         Map<String, Object> params = new HashMap<>();
 | 
	
		
			
				|  |  |                         params.put("patient_id", patientId);
 | 
	
		
			
				|  |  |                         params.put("event_no", eventNo);
 | 
	
		
			
				|  |  |                         try {
 | 
	
		
			
				|  |  |                             ObjectId objectId = GridFSUtil.uploadFile("files", blob, type, params);
 | 
	
		
			
				|  |  |                             jsonObject.put("CONTENT", objectId);
 | 
	
		
			
				|  |  |                         } catch (Exception e) {
 | 
	
		
			
				|  |  |                             e.printStackTrace();
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 //字典未转换前采集到原始库
 | 
	
		
			
				|  |  |                 boolean b = mongoOrigin.insert(stdDatasetCode, translateDictCN(list, colList, schemeVersion));
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 //字典转换
 | 
	
		
			
				|  |  |                 list = translateDict(list, colList, schemeVersion);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 //采集到mongodb
 | 
	
		
			
				|  |  |                 b = mongo.insert(stdDatasetCode, list);
 | 
	
		
			
				|  |  |                 if (!b) {
 | 
	
		
			
				|  |  |                     if (mongo.errorMessage != null && mongo.errorMessage.length() > 0) {
 | 
	
		
			
				|  |  |                         System.out.print(mongo.errorMessage);
 | 
	
		
			
				|  |  |                         return mongo.errorMessage;
 | 
	
		
			
				|  |  |                     } else {
 | 
	
		
			
				|  |  |                         return "Mongodb保存失败!(表:" + stdDatasetCode + ")";
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         } catch (Exception e) {
 | 
	
		
			
				|  |  |             return e.getMessage();
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return "";
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 数据库采集(包含Blob类型数据)
 | 
	
		
			
				|  |  |      *
 | 
	
		
			
				|  |  |      * @param ds
 | 
	
		
			
				|  |  |      * @param schemeVersion
 | 
	
		
			
				|  |  |      * @param logId
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      * @throws Exception
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String collectBlobTable(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
 | 
	
		
			
				|  |  |         String message = "";
 | 
	
		
			
				|  |  |         String datasetId = ds.getJobDatasetId();
 | 
	
		
			
				|  |  |         String jobDatasetName = ds.getJobDatasetName();
 | 
	
		
			
				|  |  |         String condition = ds.getJobDatasetCondition();
 | 
	
		
			
				|  |  |         String key = ds.getJobDatasetKey();
 | 
	
		
			
				|  |  |         String keytype = ds.getJobDatasetKeytype();
 | 
	
		
			
				|  |  |         String keyvalue = ds.getJobDatasetKeyvalue();
 | 
	
		
			
				|  |  |         String orgCode = ds.getOrgCode();
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         String datasourceId = ds.getDatasourceId();
 | 
	
		
			
				|  |  |         String config = ds.getConfig(); //数据库连接
 | 
	
		
			
				|  |  |         DBHelper db = new DBHelper(datasourceId, config);
 | 
	
		
			
				|  |  |         DBType dbType = db.dbType;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //获取数据集映射
 | 
	
		
			
				|  |  |         List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
 | 
	
		
			
				|  |  |         JSONArray datasetList = new JSONArray(datasetString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         if (datasetList != null && datasetList.length() > 0) {
 | 
	
		
			
				|  |  |             String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
 | 
	
		
			
				|  |  |             String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
 | 
	
		
			
				|  |  |             //获取数据集字段映射结构
 | 
	
		
			
				|  |  |             List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
 | 
	
		
			
				|  |  |             JSONArray colList = new JSONArray(colString);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             if (colList != null && colList.length() > 0) {
 | 
	
		
			
				|  |  |                 //拼接查询sql
 | 
	
		
			
				|  |  |                 String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
 | 
	
		
			
				|  |  |                 for (int i = 0; i < colList.length(); i++) {
 | 
	
		
			
				|  |  |                     JSONObject col = colList.getJSONObject(i);
 | 
	
		
			
				|  |  |                     String adapterMetadataCode = col.optString("adapterMetadataCode");
 | 
	
		
			
				|  |  |                     if (adapterMetadataCode.length() > 0) {
 | 
	
		
			
				|  |  |                         strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 strSql += " from " + adapterTableName;
 | 
	
		
			
				|  |  |                 String strWhere = " where 1=1";
 | 
	
		
			
				|  |  |                 //采集范围
 | 
	
		
			
				|  |  |                 if (condition != null && condition.length() > 0) {
 | 
	
		
			
				|  |  |                     strWhere += getConditionSql(dbType, condition);
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 //增量采集
 | 
	
		
			
				|  |  |                 String maxKey = "0";
 | 
	
		
			
				|  |  |                 if (key != null && key.length() > 0) {
 | 
	
		
			
				|  |  |                     maxKey = key;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     if (keytype.toUpperCase().equals("DATE")) //时间类型
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                             Date keyDate = new Date();
 | 
	
		
			
				|  |  |                             //字符串转时间
 | 
	
		
			
				|  |  |                             keyDate = DateConvert.toDate(keyvalue);
 | 
	
		
			
				|  |  |                             //根据数据库类型获取时间sql
 | 
	
		
			
				|  |  |                             strWhere += " and " + maxKey + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         maxKey = getToNumberSqlByDBType(dbType, key);
 | 
	
		
			
				|  |  |                         if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                             strWhere += " and " + maxKey + ">'" + keyvalue + "'";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     } else {
 | 
	
		
			
				|  |  |                         if (keyvalue != null && keyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                             strWhere += " and " + maxKey + ">'" + keyvalue + "'";
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                     strWhere += " order by " + maxKey;
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                 strSql += strWhere;
 | 
	
		
			
				|  |  |                 //总条数
 | 
	
		
			
				|  |  |                 String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
 | 
	
		
			
				|  |  |                 String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
 | 
	
		
			
				|  |  |                 JSONObject objCount = db.load(sqlCount);
 | 
	
		
			
				|  |  |                 if (objCount == null) {
 | 
	
		
			
				|  |  |                     if (db.errorMessage.length() > 0) {
 | 
	
		
			
				|  |  |                         throw new Exception(db.errorMessage);
 | 
	
		
			
				|  |  |                     } else {
 | 
	
		
			
				|  |  |                         throw new Exception("查询异常:" + sqlCount);
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 } else {
 | 
	
		
			
				|  |  |                     int count = objCount.getInt("COUNT");
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     if (count == 0) //0条记录,无需采集
 | 
	
		
			
				|  |  |                     {
 | 
	
		
			
				|  |  |                         message = "0条记录,无需采集。";
 | 
	
		
			
				|  |  |                     } else {
 | 
	
		
			
				|  |  |                         //获取最大值
 | 
	
		
			
				|  |  |                         JSONObject objMax = db.load(sqlMax);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                         int successCount = 0;
 | 
	
		
			
				|  |  |                         String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
 | 
	
		
			
				|  |  |                         //修改最大值
 | 
	
		
			
				|  |  |                         if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
 | 
	
		
			
				|  |  |                             datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
 | 
	
		
			
				|  |  |                             logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                         int countPage = 1;
 | 
	
		
			
				|  |  |                         if (count > maxNum) //分页采集
 | 
	
		
			
				|  |  |                         {
 | 
	
		
			
				|  |  |                             countPage = count / maxNum + 1;
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  |                         for (int i = 0; i < countPage; i++) {
 | 
	
		
			
				|  |  |                             int rows = maxNum;
 | 
	
		
			
				|  |  |                             if (i + 1 == countPage) {
 | 
	
		
			
				|  |  |                                 rows = count - i * maxNum;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                             String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             RsJobLogDetail detail = new RsJobLogDetail();
 | 
	
		
			
				|  |  |                             detail.setStartTime(new Date());
 | 
	
		
			
				|  |  |                             detail.setJobLogId(logId);
 | 
	
		
			
				|  |  |                             detail.setDatasourceId(datasourceId);
 | 
	
		
			
				|  |  |                             detail.setConfig(config);
 | 
	
		
			
				|  |  |                             detail.setStdDatasetCode(stdTableName);
 | 
	
		
			
				|  |  |                             detail.setJobDatasetId(datasetId);
 | 
	
		
			
				|  |  |                             detail.setJobDatasetName(ds.getJobDatasetName());
 | 
	
		
			
				|  |  |                             detail.setJobId(ds.getJobId());
 | 
	
		
			
				|  |  |                             detail.setJobSql(sql);
 | 
	
		
			
				|  |  |                             detail.setJobNum(i + 1);
 | 
	
		
			
				|  |  |                             detail.setJobDatasetRows(rows);
 | 
	
		
			
				|  |  |                             detail.setSchemeVersion(schemeVersion);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             List<JSONObject> list = db.query(sql);
 | 
	
		
			
				|  |  |                             String msg = "";
 | 
	
		
			
				|  |  |                             if (list != null) {
 | 
	
		
			
				|  |  |                                 msg = intoMongodb2(list, schemeVersion, stdTableName, colList); //返回信息
 | 
	
		
			
				|  |  |                             } else {
 | 
	
		
			
				|  |  |                                 if (db.errorMessage.length() > 0) {
 | 
	
		
			
				|  |  |                                     msg = db.errorMessage;
 | 
	
		
			
				|  |  |                                 } else {
 | 
	
		
			
				|  |  |                                     msg = "查询数据为空!";
 | 
	
		
			
				|  |  |                                 }
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                             if (msg.length() > 0) {
 | 
	
		
			
				|  |  |                                 //任务日志细表异常操作
 | 
	
		
			
				|  |  |                                 detail.setJobStatus("0");
 | 
	
		
			
				|  |  |                                 detail.setJobContent(msg);
 | 
	
		
			
				|  |  |                             } else {
 | 
	
		
			
				|  |  |                                 detail.setJobStatus("1");
 | 
	
		
			
				|  |  |                                 detail.setJobContent("采集成功!");
 | 
	
		
			
				|  |  |                                 successCount += rows;
 | 
	
		
			
				|  |  |                             }
 | 
	
		
			
				|  |  |                             detail.setEndTime(new Date());
 | 
	
		
			
				|  |  |                             datacollectLogDao.saveEntity(detail);
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                         message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             } else {
 | 
	
		
			
				|  |  |                 throw new Exception(jobDatasetName + "数据集字段映射为空!");
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         } else {
 | 
	
		
			
				|  |  |             throw new Exception(jobDatasetName + "数据集映射为空!");
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         return message;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | }
 |