package com.yihu.hos.datacollect.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.yihu.ehr.dbhelper.common.QueryCondition; import com.yihu.ehr.dbhelper.common.enums.DBType; import com.yihu.ehr.dbhelper.common.sqlparser.ParserMysql; import com.yihu.ehr.dbhelper.common.sqlparser.ParserOracle; import com.yihu.ehr.dbhelper.common.sqlparser.ParserSql; import com.yihu.ehr.dbhelper.common.sqlparser.ParserSqlserver; import com.yihu.ehr.dbhelper.jdbc.DBHelper; import com.yihu.ehr.dbhelper.mongodb.MongodbHelper; import com.yihu.hos.common.Services; import com.yihu.hos.core.log.Logger; import com.yihu.hos.core.log.LoggerFactory; import com.yihu.hos.crawler.model.config.SysConfig; import com.yihu.hos.crawler.model.patient.PatientIdentity; import com.yihu.hos.datacollect.dao.intf.IDatacollectDao; import com.yihu.hos.datacollect.dao.intf.IDatacollectLogDao; import com.yihu.hos.datacollect.model.*; import com.yihu.hos.datacollect.service.intf.IDatacollectManager; import com.yihu.hos.datacollect.service.intf.IDatacollectService; import com.yihu.hos.resource.service.IStdService; import com.yihu.hos.web.framework.constant.DateConvert; import com.yihu.hos.web.framework.constant.SqlConstants; import com.yihu.hos.web.framework.model.ActionResult; import com.yihu.hos.web.framework.util.GridFSUtil; import org.bson.types.ObjectId; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.io.SAXReader; import org.json.JSONArray; import org.json.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.io.ByteArrayInputStream; import java.sql.Blob; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; /** * 数据采集执行服务 */ @Service(Services.DatacollectService) public class DatacollectService implements IDatacollectService { static private final Logger logger = LoggerFactory.getLogger(DatacollectService.class); MongodbHelper mongoOrigin = new MongodbHelper("origin"); MongodbHelper mongo = new MongodbHelper(); String dateFormat = "yyyy-MM-dd HH:mm:ss"; //默认时间字符串格式 int maxNum = 1000; //查询条数限制 @Resource(name = Services.Datacollect) private IDatacollectManager datacollect; @Resource(name = Services.StdService) private IStdService stdService; @Resource(name = "DatacollectDao") private IDatacollectDao datacollectDao; @Resource(name = "DatacollectLogDao") private IDatacollectLogDao datacollectLogDao; @Autowired private ObjectMapper objectMapper; /** * 根据连接字符串获取数据库类型 */ private static DBType getDbType(String uri) { return uri.startsWith("jdbc:mysql") ? DBType.Mysql : (uri.startsWith("jdbc:oracle") ? DBType.Oracle : (uri.startsWith("jdbc:hive2") ? DBType.Hive : (uri.startsWith("jdbc:microsoft:sqlserver") ? DBType.Sqlserver : DBType.Mysql))); } // public static void main(String[] args) throws Exception { // //namespace是命名空间,methodName是方法名 // String sql = "select count(1) as COUNT,max(to_number(HDSD03_01_031)) as MAX_KEYVALUE from HDSC01_02 where 1=1 order by to_number(HDSD03_01_031)"; // //调用web Service//输出调用结果 // System.out.println(WebserviceUtil.request("http://172.19.103.71:8080/service/sql?wsdl", "ExcuteSQL", new Object[]{"", sql})); // // } /** * 执行任务 */ @Override public void executeJob(String jobId) throws Exception { //获取任务详细信息 RsJobConfig job = datacollect.getJobById(jobId); RsJobLog log = new RsJobLog(); log.setJobId(jobId); log.setJobStartTime(new Date()); datacollectLogDao.saveEntity(log); String logId = log.getId(); logger.info("任务" + jobId + "开始采集,新增日志" + logId + "。"); StringBuilder logStr = new StringBuilder(); int count = 0; int success = 0; try { String schemeVersion = job.getSchemeVersion(); //获取任务相关数据集 List list = datacollectDao.getDatacollectDataset(jobId); logger.info("获取任务相关数据集,数量" + list.size() + "。"); if (list != null && list.size() > 0) { count = list.size(); logStr.append("/*********** 开始采集 *******************/\n"); //遍历数据集 for (DtoJobDataset ds : list) { try { String type = ds.getType(); String message = ""; logStr.append(DateConvert.toString(new Date(), dateFormat) + " " + ds.getJobDatasetName()); if (type != null) { if (type.equals("1")) //Web Service { message = collectWebservice(ds, schemeVersion, logId) + "\n"; } else if (type.equals("2"))//文件系统 { message = "文件系统采集。\n"; } else { //数据库 message = collectTable(ds, schemeVersion, logId) + "\n"; } } else { message = ds.getJobDatasetName() + "未关联数据源!\n"; } logger.info(message); //文本日志 logStr.append(message); success++; } catch (Exception ex) { logger.info("异常:" + ex.getMessage()); logStr.append(ex.getMessage() + "\n"); } } logStr.append("/*********** 结束采集 *******************/\n"); } } catch (Exception ex) { ex.printStackTrace(); logger.info("异常:" + ex.getMessage()); logStr.append(ex.getMessage() + "\n"); logStr.append("/*********** 出现异常,中断采集 *******************/\n"); } //任务主日志成功 String jobContent = logStr.toString().replace("\"", "\\\""); if (jobContent.length() > 4000) { jobContent = jobContent.substring(0, 4000); } log.setJobContent(jobContent); log.setJobEndTime(new Date()); log.setJobDatasetCount(count); log.setJobDatasetSuccess(success); logger.info("任务结束," + count + "个数据集成功采集" + success + "个。"); datacollectLogDao.updateEntity(log); } /** * 根据日志详细补采数据 */ @Override @Transactional public ActionResult repeatJob(String id) throws Exception { RsJobLogDetail log = datacollectLogDao.getEntity(RsJobLogDetail.class, id); if (log.getJobStatus().equals("2")) { return new ActionResult(false, "数据补采中!"); } if (!log.getJobStatus().equals("0")) { return new ActionResult(false, "数据无需补采!"); } try { log.setRepeatStartTime(new Date()); log.setJobStatus("2"); //设置采集中状态 datacollectLogDao.updateEntity(log); } catch (Exception e) { return new ActionResult(false, "补采失败!"); } log.setJobStatus("0"); datacollectLogDao.updateEntity(log); String stdDatasetCode = log.getStdDatasetCode(); String sql = log.getJobSql(); //数据库连接 String datasourceId = log.getDatasourceId(); String config = log.getConfig(); DBHelper db = new DBHelper(datasourceId, config); //获取数据集字段映射结构 String schemeVersion = log.getSchemeVersion(); String datasetId = log.getJobDatasetId(); List colString = stdService.getDatacolByScheme(schemeVersion, datasetId); JSONArray colList = new JSONArray(colString); List list = db.query(sql); String message = intoMongodb(list, schemeVersion, stdDatasetCode, colList); if (message.length() > 0 || db.errorMessage.length() > 0) { log.setJobStatus("0"); log.setRepeatEndTime(new Date()); if (message.length() > 0) { log.setRepeatJobContent(message); } else { db.errorMessage.length(); } datacollectLogDao.updateEntity(log); return new ActionResult(false, "补采失败!"); } else { log.setJobStatus("3"); log.setRepeatEndTime(new Date()); log.setRepeatJobContent("补采成功!"); datacollectLogDao.updateEntity(log); return new ActionResult(true, "补采成功!"); } } /** * 根据数据库类型获取时间sql * * @return */ private String getDateSqlByDBType(DBType dbType, Date date) throws Exception { String val = DateConvert.toString(date, dateFormat); if (dbType.equals(DBType.Mysql)) { return "date_format(\'" + val + "\',\'" + dateFormat + "\')"; } else if (dbType.equals(DBType.Oracle)) { return "to_date(\'" + val + "\',\'" + dateFormat + "\')"; } else { return val; } } /** * 根据数据库类型获取转换数值型sql */ private String getToNumberSqlByDBType(DBType dbType, String key) throws Exception { if (dbType.equals(DBType.Mysql)) { return "cast(" + key + " as signed integer)"; } else if (dbType.equals(DBType.Oracle)) { return "to_number(" + key + ")"; } else { return key; } } /** * 根据数据库类型获取分页sql * * @return */ private String getPageSqlByDBType(DBType dbType, String sql, int start, int rows) throws Exception { if (dbType.equals(DBType.Mysql)) { return sql + " LIMIT " + start + "," + rows; } else if (dbType.equals(DBType.Oracle)) { return " select * from (select t.*,ROWNUM RSCOM_RN from (" + sql + ") t where ROWNUM<" + (start + rows + 1) + ") where RSCOM_RN>= " + (start + 1); } else { return sql; } } /** * 字典全转换成中文 */ private List translateDictCN(List list, JSONArray colList, String schemeVersion) throws Exception { //获取字典列表 List dictColList = new ArrayList<>(); for (int i = 0; i < colList.length(); i++) { JSONObject col = colList.getJSONObject(i); String dictId = col.optString("adapterDictId"); if (dictId != null && dictId.length() > 0) { String dictType = col.optString("adapterDataType"); String stdMetadataCode = col.optString("stdMetadataCode"); DtoDictCol dictCol = new DtoDictCol(); dictCol.setStdMetadataCode(stdMetadataCode); dictCol.setStdDictId(dictId); dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典 //获取字典数据 List dictString = stdService.getDictByScheme(schemeVersion, dictId); JSONArray dictAdapterArray = new JSONArray(dictString); dictCol.setDictList(dictAdapterArray); dictColList.add(dictCol); } } //翻译列表 for (JSONObject data : list) { //遍历字典字段 for (DtoDictCol col : dictColList) { String colNmae = col.getStdMetadataCode(); String oldValue = data.optString(colNmae); String newValue = translateDictValueCN(oldValue, col.getAdapterDataType(), col.getDictList()); if (newValue != null && newValue.length() > 0) { data.put(colNmae, newValue); } } } return list; } /** * 转译字典成中文 * * @return */ private String translateDictValueCN(String oldValue, String type, JSONArray dictAdapterList) throws Exception { if (type.equals("0")) //原本就是值 { return oldValue; } //遍历字典数据(编码->名称) for (int i = 0; i < dictAdapterList.length(); i++) { JSONObject dictItem = dictAdapterList.getJSONObject(i); if (oldValue != null && dictItem.has("stdEntryCode")) { if (oldValue.equals(dictItem.getString("stdEntryCode"))) { String newValue = dictItem.getString("stdEntryValue"); //名称 return newValue; } } } return oldValue; } /** * 字典转换 * * @param list * @param colList * @return * @throws Exception */ private List translateDict(List list, JSONArray colList, String schemeVersion) throws Exception { //获取字典列表 List dictColList = new ArrayList<>(); for (int i = 0; i < colList.length(); i++) { JSONObject col = colList.getJSONObject(i); String dictId = col.optString("adapterDictId"); if (dictId != null && dictId.length() > 0) { String dictType = col.optString("adapterDataType"); String stdMetadataCode = col.optString("stdMetadataCode"); DtoDictCol dictCol = new DtoDictCol(); dictCol.setStdMetadataCode(stdMetadataCode); dictCol.setStdDictId(dictId); dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典 //获取字典数据 List dictString = stdService.getDictByScheme(schemeVersion, dictId); JSONArray dictAdapterArray = new JSONArray(dictString); dictCol.setDictList(dictAdapterArray); dictColList.add(dictCol); } } //翻译列表 for (JSONObject data : list) { //遍历字典字段 for (DtoDictCol col : dictColList) { String colNmae = col.getStdMetadataCode(); String oldValue = data.optString(colNmae); String newValue = translateDictValue(oldValue, col.getAdapterDataType(), col.getDictList()); if (newValue != null && newValue.length() > 0) { data.put(colNmae, newValue); } } } return list; } /** * 转译字典 * * @return */ private String translateDictValue(String oldValue, String type, JSONArray dictAdapterList) throws Exception { //应用标准字段 String colName = "adapterEntryCode"; if (type.equals("0")) //通过name转译 { colName = "adapterEntryValue"; } //遍历字典数据 for (int i = 0; i < dictAdapterList.length(); i++) { JSONObject dictItem = dictAdapterList.getJSONObject(i); if (oldValue != null && dictItem.has(colName)) { if (oldValue.equals(dictItem.getString(colName))) { String newValue = dictItem.getString("stdEntryCode"); return newValue; } } } //找不到适配字典数据则返回空 return ""; } /** * 获取过滤条件 * * @return */ private String getCondition(DBType dbType, String conditionString) { JSONArray array = new JSONArray(conditionString); if (array != null && array.length() > 0) { List conditions = new ArrayList<>(); for (Object item : array) { JSONObject obj = (JSONObject) item; String logical = obj.getString("andOr"); String operation = obj.getString("condition"); String field = obj.getString("field"); String keyword = obj.getString("value"); conditions.add(new QueryCondition(logical, operation, field, keyword)); } //条件语句转换 ParserSql ps; switch (dbType) { case Oracle: ps = new ParserOracle(); break; case Sqlserver: ps = new ParserSqlserver(); break; default: ps = new ParserMysql(); } return ps.getConditionSql(conditions); } return ""; } /** * 获取条件SQL * * @param dbType * @param conditionString * @return * @throws ParseException */ private String getConditionSql(DBType dbType, String conditionString) throws ParseException { String conditionSql = ""; JSONArray conditions = new JSONArray(conditionString); Iterator iterator = conditions.iterator(); while (iterator.hasNext()) { JSONObject condition = (JSONObject) iterator.next(); String logic = condition.getString("condition"); String andOr = condition.getString("andOr"); String field = condition.getString("field"); String value = condition.getString("value"); String fieldType = condition.getString("type"); String keys = ""; if (andOr.equals(" AND ")) { conditionSql = conditionSql + " and "; } else { conditionSql = conditionSql + " or "; } if (logic.equals(" IN ") || logic.equals(" NOT IN ")) { String[] keywords = value.split(","); for (String key : keywords) { keys += "'" + key + "',"; } keys = " (" + keys.substring(0, keys.length() - 1) + ") "; } else if (logic.equals(" LIKE ")) { keys += " '%" + value + "%' "; } else { if (fieldType.equals("DATE")) { keys += getDateFormatSql(dbType, value); } else { keys += " '" + value + "' "; } } conditionSql += field + logic + keys; } return conditionSql; } /** * 获取对应数据库时间格式 * * @param dbType * @param key * @return * @throws ParseException */ private String getDateFormatSql(DBType dbType, String key) throws ParseException { String dateFormat = "yyyy-MM-dd HH:mm:ss"; SimpleDateFormat formatDate = new SimpleDateFormat("yyyy-MM-dd"); Date d = formatDate.parse(key); SimpleDateFormat format = new SimpleDateFormat(dateFormat); switch (dbType) { case Oracle: key = "to_date(\'" + format.format(d) + "\',\'YYYY-MM-DD HH24:MI:SS\')"; break; case Sqlserver: break; default: key = "date_format(\'" + format.format(d) + "\',\'%y-%m-%d %T\')"; } return key; } /** * 采集入库 * * @return */ private String intoMongodb(List list, String schemeVersion, String stdDatasetCode, JSONArray colList) { String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase(); String eventNoCode = SqlConstants.EVENT_NO.toUpperCase(); PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode); if (patientIdentity != null) { patientIdCode = patientIdentity.getPatientIDCode(); eventNoCode = patientIdentity.getEventNoCode(); } try { if (!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) { return "Mongodb索引创建失败!(表:" + stdDatasetCode + ")"; } if (list != null && list.size() > 0) { //字典未转换前采集到原始库 boolean b = mongoOrigin.insert(stdDatasetCode, translateDictCN(list, colList, schemeVersion)); //字典转换 list = translateDict(list, colList, schemeVersion); //采集到mongodb b = mongo.insert(stdDatasetCode, list); if (!b) { if (mongo.errorMessage != null && mongo.errorMessage.length() > 0) { logger.debug(mongo.errorMessage); return mongo.errorMessage; } else { return "Mongodb保存失败!(表:" + stdDatasetCode + ")"; } } } } catch (Exception e) { return e.getMessage(); } return ""; } /** * 数据库表采集 * * @return */ private String collectTable(DtoJobDataset ds, String schemeVersion, String logId) throws Exception { String message = ""; String datasetId = ds.getJobDatasetId(); String jobDatasetName = ds.getJobDatasetName(); String condition = ds.getJobDatasetCondition(); String key = ds.getJobDatasetKey(); String keytype = ds.getJobDatasetKeytype(); String keyvalue = ds.getJobDatasetKeyvalue(); String orgCode = ds.getOrgCode(); String datasourceId = ds.getDatasourceId(); String config = ds.getConfig(); //数据库连接 DBHelper db = new DBHelper(datasourceId, config); DBType dbType = db.dbType; //获取数据集映射 List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId); JSONArray datasetList = new JSONArray(datasetString); if (datasetList != null && datasetList.length() > 0) { String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode"); String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode"); //获取数据集字段映射结构 List colString = stdService.getDatacolByScheme(schemeVersion, datasetId); JSONArray colList = new JSONArray(colString); if (colList != null && colList.length() > 0) { //拼接查询sql String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE"; for (int i = 0; i < colList.length(); i++) { JSONObject col = colList.getJSONObject(i); String adapterMetadataCode = col.optString("adapterMetadataCode"); if (adapterMetadataCode.length() > 0) { strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode"); } } strSql += " from " + adapterTableName; String strWhere = " where 1=1"; //采集范围 if (condition != null && condition.length() > 0) { strWhere += getConditionSql(dbType, condition); } //增量采集 String maxKey = "0"; if (key != null && key.length() > 0) { maxKey = key; if (keytype.toUpperCase().equals("DATE")) //时间类型 { if (keyvalue != null && keyvalue.length() > 0) { Date keyDate = new Date(); //字符串转时间 keyDate = DateConvert.toDate(keyvalue); //根据数据库类型获取时间sql strWhere += " and " + maxKey + ">'" + getDateSqlByDBType(dbType, keyDate) + "'"; } } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型 { maxKey = getToNumberSqlByDBType(dbType, key); if (keyvalue != null && keyvalue.length() > 0) { strWhere += " and " + maxKey + ">'" + keyvalue + "'"; } } else { if (keyvalue != null && keyvalue.length() > 0) { strWhere += " and " + maxKey + ">'" + keyvalue + "'"; } } strWhere += " order by " + maxKey; } strSql += strWhere; //总条数 String sqlCount = "select count(1) as COUNT from (" + strSql + ")"; String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere; JSONObject objCount = db.load(sqlCount); if (objCount == null) { if (db.errorMessage.length() > 0) { throw new Exception(db.errorMessage); } else { throw new Exception("查询异常:" + sqlCount); } } else { int count = objCount.getInt("COUNT"); if (count == 0) //0条记录,无需采集 { message = "0条记录,无需采集。"; } else { //获取最大值 JSONObject objMax = db.load(sqlMax); int successCount = 0; String maxKeyvalue = objMax.optString("MAX_KEYVALUE"); //修改最大值 if (maxKeyvalue != null && maxKeyvalue.length() > 0) { datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue); logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志 } int countPage = 1; if (count > maxNum) //分页采集 { countPage = count / maxNum + 1; } for (int i = 0; i < countPage; i++) { int rows = maxNum; if (i + 1 == countPage) { rows = count - i * maxNum; } String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句 RsJobLogDetail detail = new RsJobLogDetail(); detail.setStartTime(new Date()); detail.setJobLogId(logId); detail.setDatasourceId(datasourceId); detail.setConfig(config); detail.setStdDatasetCode(stdTableName); detail.setJobDatasetId(datasetId); detail.setJobDatasetName(ds.getJobDatasetName()); detail.setJobId(ds.getJobId()); detail.setJobSql(sql); detail.setJobNum(i + 1); detail.setJobDatasetRows(rows); detail.setSchemeVersion(schemeVersion); List list = db.query(sql); String msg = ""; if (list != null) { msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息 } else { if (db.errorMessage.length() > 0) { msg = db.errorMessage; } else { msg = "查询数据为空!"; } } if (msg.length() > 0) { //任务日志细表异常操作 detail.setJobStatus("0"); detail.setJobContent(msg); logger.info(msg); //文本日志 } else { detail.setJobStatus("1"); detail.setJobContent("采集成功!"); successCount += rows; } detail.setEndTime(new Date()); datacollectLogDao.saveEntity(detail); } message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。"; } } } else { throw new Exception(jobDatasetName + "数据集字段映射为空!"); } } else { throw new Exception(jobDatasetName + "数据集映射为空!"); } logger.info(message); return message; } /** * XML转JSONList * * @return */ private List getListFromXml(String xml) throws Exception { SAXReader reader = new SAXReader(); Document doc = reader.read(new ByteArrayInputStream(xml.getBytes("UTF-8"))); Element root = doc.getRootElement(); List re = new ArrayList<>(); //xml数据列表 Iterator iter = root.elementIterator("Data"); while (iter.hasNext()) { JSONObject obj = new JSONObject(); Element el = (Element) iter.next(); Iterator cols = el.elementIterator(); while (cols.hasNext()) { Element col = (Element) cols.next(); obj.put(col.getName().toUpperCase(), col.getStringValue()); } re.add(obj); } return re; } /** * webservice采集 * * @return */ private String collectWebservice(DtoJobDataset ds, String schemeVersion, String logId) throws Exception { String message = ""; String datasetId = ds.getJobDatasetId(); String jobDatasetName = ds.getJobDatasetName(); String condition = ds.getJobDatasetCondition(); String key = ds.getJobDatasetKey(); String keytype = ds.getJobDatasetKeytype(); String keyvalue = ds.getJobDatasetKeyvalue(); String orgCode = ds.getOrgCode(); String datasourceId = ds.getDatasourceId(); String config = ds.getConfig(); //数据库连接 DBType dbType = DBType.Oracle;//********** 先定死Oracle **************************** //webservice地址 Map mapConfig = objectMapper.readValue(config, Map.class); if (mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) { String url = mapConfig.get("protocol") + "://" + mapConfig.get("url"); //获取数据集映射 List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId); JSONArray datasetList = new JSONArray(datasetString); if (datasetList != null && datasetList.length() > 0) { String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode"); String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode"); //获取数据集字段映射结构 List colString = stdService.getDatacolByScheme(schemeVersion, datasetId); JSONArray colList = new JSONArray(colString); if (colList != null && colList.length() > 0) { //拼接查询sql String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE"; for (int i = 0; i < colList.length(); i++) { JSONObject col = colList.getJSONObject(i); String adapterMetadataCode = col.optString("adapterMetadataCode"); if (adapterMetadataCode.length() > 0) { strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode"); } } strSql += " from " + adapterTableName; String strWhere = " where 1=1"; //采集范围 if (condition != null && condition.length() > 0) { strWhere += getConditionSql(dbType, condition); } //增量采集 String maxKey = "0"; String keyValue = ds.getJobDatasetKeyvalue(); if (key != null && key.length() > 0) { maxKey = key; if (keytype.toUpperCase().equals("DATE")) //时间类型 { Date keyDate = new Date(); if (keyvalue != null && keyvalue.length() > 0) { //字符串转时间 keyDate = DateConvert.toDate(keyvalue); //根据数据库类型获取时间sql strWhere += " and " + key + ">'" + getDateSqlByDBType(dbType, keyDate) + "'"; strWhere += " order by " + key; } } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型 { maxKey = getToNumberSqlByDBType(dbType, key); if (keyvalue != null && keyvalue.length() > 0) { strWhere += " and " + maxKey + ">'" + keyvalue + "'"; strWhere += " order by " + maxKey; } } else { if (keyvalue != null && keyvalue.length() > 0) { strWhere += " and " + key + ">'" + keyvalue + "'"; strWhere += " order by " + key; } } } strSql += strWhere; //总条数和最大值查询 String sqlCount = "select count(1) as COUNT from (" + strSql + ")"; String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere; //webservice获取数据总条数 String strCount = "";//WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlCount}); List dataCount = getListFromXml(strCount); if (dataCount != null && dataCount.size() > 0) { Integer count = Integer.parseInt(dataCount.get(0).getString("COUNT")); if (count == 0) //0条记录,无需采集 { message = "0条记录,无需采集。"; } else { //webservice获取最大值 String strMax = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlMax}); List dataMax = getListFromXml(strCount); int successCount = 0; String maxKeyvalue = dataMax.get(0).getString("MAX_KEYVALUE"); //修改最大值 if (maxKeyvalue != null && maxKeyvalue.length() > 0) { datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue); logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志 } int countPage = 1; if (count > maxNum) //分页采集 { countPage = count / maxNum + 1; } for (int i = 0; i < countPage; i++) { int rows = maxNum; if (i + 1 == countPage) { rows = count - i * maxNum; } String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句 RsJobLogDetail detail = new RsJobLogDetail(); detail.setStartTime(new Date()); detail.setJobLogId(logId); detail.setDatasourceId(datasourceId); detail.setConfig(config); detail.setStdDatasetCode(stdTableName); detail.setJobDatasetId(datasetId); detail.setJobDatasetName(ds.getJobDatasetName()); detail.setJobId(ds.getJobId()); detail.setJobSql(sql); detail.setJobNum(i + 1); detail.setJobDatasetRows(rows); detail.setSchemeVersion(schemeVersion); String msg = ""; try { //获取分页数据 String strList = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sql}); List list = getListFromXml(strList); if (list != null) { msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息 } else { msg = "查询数据为空!"; } if (msg.length() > 0) { //任务日志细表异常操作 detail.setJobStatus("0"); detail.setJobContent(msg); logger.info(msg); //文本日志 } else { detail.setJobStatus("1"); detail.setJobContent("采集成功!"); successCount += rows; } } catch (Exception ex) { msg = ex.getMessage(); } detail.setEndTime(new Date()); datacollectLogDao.saveEntity(detail); } message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。"; } } } else { throw new Exception(jobDatasetName + "数据集字段映射为空!"); } } else { throw new Exception(jobDatasetName + "数据集映射为空!"); } } else { throw new Exception("非法webservice路径!"); } logger.info(message); return message; } /** * 采集入库(包含blob字段处理) * @return */ private String intoMongodb2(List list,String schemeVersion,String stdDatasetCode,JSONArray colList) { String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase(); String eventNoCode = SqlConstants.EVENT_NO.toUpperCase(); PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode); if (patientIdentity != null) { patientIdCode = patientIdentity.getPatientIDCode(); eventNoCode = patientIdentity.getEventNoCode(); } try{ if(!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) { return "Mongodb索引创建失败!(表:"+stdDatasetCode+")"; } if(list!=null && list.size()>0) { //TODO TOSET 判断是否是非结构化数据集 if ("unstructured".equals(stdDatasetCode)){ for (JSONObject jsonObject:list) { //文件内容保存到GridFS,细表内容字段保存为文件objctId Blob blob = (Blob) jsonObject.get("CONTENT"); String type = (String) jsonObject.get("FILE_TYPE"); String patientId= (String) jsonObject.get("patient_id"); String eventNo= (String) jsonObject.get("event_no"); Map params = new HashMap<>(); params.put("patient_id",patientId); params.put("event_no",eventNo); try { ObjectId objectId = GridFSUtil.uploadFile("files", blob, type, params); jsonObject.put("CONTENT", objectId); } catch (Exception e) { e.printStackTrace(); } } } //字典未转换前采集到原始库 boolean b = mongoOrigin.insert(stdDatasetCode,translateDictCN(list, colList,schemeVersion)); //字典转换 list = translateDict(list, colList,schemeVersion); //采集到mongodb b = mongo.insert(stdDatasetCode,list); if(!b) { if(mongo.errorMessage!=null && mongo.errorMessage.length()>0) { System.out.print(mongo.errorMessage); return mongo.errorMessage; } else { return "Mongodb保存失败!(表:"+stdDatasetCode+")"; } } } } catch (Exception e) { return e.getMessage(); } return ""; } /** * 数据库采集(包含Blob类型数据) * @param ds * @param schemeVersion * @param logId * @return * @throws Exception */ private String collectBlobTable(DtoJobDataset ds,String schemeVersion,String logId) throws Exception { String message = ""; String datasetId = ds.getJobDatasetId(); String jobDatasetName = ds.getJobDatasetName(); String condition=ds.getJobDatasetCondition(); String key=ds.getJobDatasetKey(); String keytype=ds.getJobDatasetKeytype(); String keyvalue=ds.getJobDatasetKeyvalue(); String orgCode = ds.getOrgCode(); String datasourceId = ds.getDatasourceId(); String config = ds.getConfig(); //数据库连接 DBHelper db = new DBHelper(datasourceId,config); DBType dbType = db.dbType; //获取数据集映射 List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId); JSONArray datasetList = new JSONArray(datasetString); if(datasetList!=null &&datasetList.length()>0) { String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode"); String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode"); //获取数据集字段映射结构 List colString = stdService.getDatacolByScheme(schemeVersion,datasetId); JSONArray colList = new JSONArray(colString); if(colList!=null && colList.length()>0) { //拼接查询sql String strSql = "Select '" + orgCode +"' as RSCOM_ORG_CODE"; for(int i=0; i< colList.length();i++) { JSONObject col = colList.getJSONObject(i); String adapterMetadataCode = col.optString("adapterMetadataCode"); if(adapterMetadataCode.length()>0) { strSql+= ","+adapterMetadataCode +" as " + col.optString("stdMetadataCode") ; } } strSql += " from " +adapterTableName; String strWhere = " where 1=1"; //采集范围 if(condition!=null && condition.length()>0) { strWhere += getConditionSql(dbType,condition); } //增量采集 String maxKey = "0"; if(key!=null && key.length()>0) { maxKey = key; if(keytype.toUpperCase().equals("DATE")) //时间类型 { if(keyvalue!=null && keyvalue.length()>0) { Date keyDate = new Date(); //字符串转时间 keyDate = DateConvert.toDate(keyvalue); //根据数据库类型获取时间sql strWhere += " and "+ maxKey + ">'"+getDateSqlByDBType(dbType,keyDate)+"'"; } } else if(keytype.toUpperCase().equals("VARCHAR")) //字符串类型 { maxKey = getToNumberSqlByDBType(dbType,key); if(keyvalue!=null && keyvalue.length()>0) { strWhere += " and "+ maxKey + ">'" + keyvalue + "'"; } } else{ if(keyvalue!=null && keyvalue.length()>0) { strWhere += " and "+ maxKey + ">'" + keyvalue + "'"; } } strWhere += " order by " + maxKey; } strSql += strWhere; //总条数 String sqlCount = "select count(1) as COUNT from (" + strSql+")"; String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere; JSONObject objCount = db.load(sqlCount); if(objCount==null) { if(db.errorMessage.length()>0) { throw new Exception(db.errorMessage); } else{ throw new Exception("查询异常:"+sqlCount); } } else{ int count = objCount.getInt("COUNT"); if(count==0) //0条记录,无需采集 { message = "0条记录,无需采集。"; } else { //获取最大值 JSONObject objMax = db.load(sqlMax); int successCount = 0; String maxKeyvalue = objMax.optString("MAX_KEYVALUE"); //修改最大值 if(maxKeyvalue!=null&& maxKeyvalue.length()>0) { datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(),maxKeyvalue); logger.info("修改任务数据集最大值为"+maxKeyvalue+"。"); //文本日志 } int countPage = 1; if(count > maxNum) //分页采集 { countPage = count/maxNum+1; } for(int i=0;i list = db.query(sql); String msg = ""; if(list!=null) { msg = intoMongodb2(list,schemeVersion,stdTableName,colList); //返回信息 } else{ if(db.errorMessage.length()>0) { msg = db.errorMessage; } else{ msg = "查询数据为空!"; } } if(msg.length()>0) { //任务日志细表异常操作 detail.setJobStatus("0"); detail.setJobContent(msg); } else{ detail.setJobStatus("1"); detail.setJobContent("采集成功!"); successCount += rows; } detail.setEndTime(new Date()); datacollectLogDao.saveEntity(detail); } message = jobDatasetName + "采集成功"+successCount+"条数据,总条数"+count+"条。"; } } } else { throw new Exception(jobDatasetName + "数据集字段映射为空!"); } } else{ throw new Exception(jobDatasetName + "数据集映射为空!"); } return message; } }