123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209 |
- package com.yihu.hos.datacollect.service;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.ehr.dbhelper.common.QueryCondition;
- import com.yihu.ehr.dbhelper.common.enums.DBType;
- import com.yihu.ehr.dbhelper.common.sqlparser.ParserMysql;
- import com.yihu.ehr.dbhelper.common.sqlparser.ParserOracle;
- import com.yihu.ehr.dbhelper.common.sqlparser.ParserSql;
- import com.yihu.ehr.dbhelper.common.sqlparser.ParserSqlserver;
- import com.yihu.ehr.dbhelper.jdbc.DBHelper;
- import com.yihu.ehr.dbhelper.mongodb.MongodbHelper;
- import com.yihu.hos.common.Services;
- import com.yihu.hos.core.log.Logger;
- import com.yihu.hos.core.log.LoggerFactory;
- import com.yihu.hos.crawler.model.config.SysConfig;
- import com.yihu.hos.crawler.model.patient.PatientIdentity;
- import com.yihu.hos.datacollect.dao.intf.IDatacollectDao;
- import com.yihu.hos.datacollect.dao.intf.IDatacollectLogDao;
- import com.yihu.hos.datacollect.model.*;
- import com.yihu.hos.datacollect.service.intf.IDatacollectManager;
- import com.yihu.hos.datacollect.service.intf.IDatacollectService;
- import com.yihu.hos.resource.service.IStdService;
- import com.yihu.hos.web.framework.constant.DateConvert;
- import com.yihu.hos.web.framework.constant.SqlConstants;
- import com.yihu.hos.web.framework.model.ActionResult;
- import com.yihu.hos.web.framework.util.GridFSUtil;
- import org.bson.types.ObjectId;
- import org.dom4j.Document;
- import org.dom4j.Element;
- import org.dom4j.io.SAXReader;
- import org.json.JSONArray;
- import org.json.JSONObject;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import javax.annotation.Resource;
- import java.io.ByteArrayInputStream;
- import java.sql.Blob;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.*;
- /**
- * 数据采集执行服务
- */
- @Service(Services.DatacollectService)
- public class DatacollectService implements IDatacollectService {
- static private final Logger logger = LoggerFactory.getLogger(DatacollectService.class);
- MongodbHelper mongoOrigin = new MongodbHelper("origin");
- MongodbHelper mongo = new MongodbHelper();
- String dateFormat = "yyyy-MM-dd HH:mm:ss"; //默认时间字符串格式
- int maxNum = 1000; //查询条数限制
- @Resource(name = Services.Datacollect)
- private IDatacollectManager datacollect;
- @Resource(name = Services.StdService)
- private IStdService stdService;
- @Resource(name = "DatacollectDao")
- private IDatacollectDao datacollectDao;
- @Resource(name = "DatacollectLogDao")
- private IDatacollectLogDao datacollectLogDao;
- @Autowired
- private ObjectMapper objectMapper;
- /**
- * 根据连接字符串获取数据库类型
- */
- private static DBType getDbType(String uri) {
- return uri.startsWith("jdbc:mysql") ? DBType.Mysql : (uri.startsWith("jdbc:oracle") ? DBType.Oracle : (uri.startsWith("jdbc:hive2") ? DBType.Hive : (uri.startsWith("jdbc:microsoft:sqlserver") ? DBType.Sqlserver : DBType.Mysql)));
- }
- // public static void main(String[] args) throws Exception {
- // //namespace是命名空间,methodName是方法名
- // String sql = "select count(1) as COUNT,max(to_number(HDSD03_01_031)) as MAX_KEYVALUE from HDSC01_02 where 1=1 order by to_number(HDSD03_01_031)";
- // //调用web Service//输出调用结果
- // System.out.println(WebserviceUtil.request("http://172.19.103.71:8080/service/sql?wsdl", "ExcuteSQL", new Object[]{"", sql}));
- //
- // }
- /**
- * 执行任务
- */
- @Override
- public void executeJob(String jobId) throws Exception {
- //获取任务详细信息
- RsJobConfig job = datacollect.getJobById(jobId);
- RsJobLog log = new RsJobLog();
- log.setJobId(jobId);
- log.setJobStartTime(new Date());
- datacollectLogDao.saveEntity(log);
- String logId = log.getId();
- logger.info("任务" + jobId + "开始采集,新增日志" + logId + "。");
- StringBuilder logStr = new StringBuilder();
- int count = 0;
- int success = 0;
- try {
- String schemeVersion = job.getSchemeVersion();
- //获取任务相关数据集
- List<DtoJobDataset> list = datacollectDao.getDatacollectDataset(jobId);
- logger.info("获取任务相关数据集,数量" + list.size() + "。");
- if (list != null && list.size() > 0) {
- count = list.size();
- logStr.append("/*********** 开始采集 *******************/\n");
- //遍历数据集
- for (DtoJobDataset ds : list) {
- try {
- String type = ds.getType();
- String message = "";
- logStr.append(DateConvert.toString(new Date(), dateFormat) + " " + ds.getJobDatasetName());
- if (type != null) {
- if (type.equals("1")) //Web Service
- {
- message = collectWebservice(ds, schemeVersion, logId) + "\n";
- } else if (type.equals("2"))//文件系统
- {
- message = "文件系统采集。\n";
- } else { //数据库
- message = collectTable(ds, schemeVersion, logId) + "\n";
- }
- } else {
- message = ds.getJobDatasetName() + "未关联数据源!\n";
- }
- logger.info(message); //文本日志
- logStr.append(message);
- success++;
- } catch (Exception ex) {
- logger.info("异常:" + ex.getMessage());
- logStr.append(ex.getMessage() + "\n");
- }
- }
- logStr.append("/*********** 结束采集 *******************/\n");
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.info("异常:" + ex.getMessage());
- logStr.append(ex.getMessage() + "\n");
- logStr.append("/*********** 出现异常,中断采集 *******************/\n");
- }
- //任务主日志成功
- String jobContent = logStr.toString().replace("\"", "\\\"");
- if (jobContent.length() > 4000) {
- jobContent = jobContent.substring(0, 4000);
- }
- log.setJobContent(jobContent);
- log.setJobEndTime(new Date());
- log.setJobDatasetCount(count);
- log.setJobDatasetSuccess(success);
- logger.info("任务结束," + count + "个数据集成功采集" + success + "个。");
- datacollectLogDao.updateEntity(log);
- }
- /**
- * 根据日志详细补采数据
- */
- @Override
- @Transactional
- public ActionResult repeatJob(String id) throws Exception {
- RsJobLogDetail log = datacollectLogDao.getEntity(RsJobLogDetail.class, id);
- if (log.getJobStatus().equals("2")) {
- return new ActionResult(false, "数据补采中!");
- }
- if (!log.getJobStatus().equals("0")) {
- return new ActionResult(false, "数据无需补采!");
- }
- try {
- log.setRepeatStartTime(new Date());
- log.setJobStatus("2"); //设置采集中状态
- datacollectLogDao.updateEntity(log);
- } catch (Exception e) {
- return new ActionResult(false, "补采失败!");
- }
- log.setJobStatus("0");
- datacollectLogDao.updateEntity(log);
- String stdDatasetCode = log.getStdDatasetCode();
- String sql = log.getJobSql();
- //数据库连接
- String datasourceId = log.getDatasourceId();
- String config = log.getConfig();
- DBHelper db = new DBHelper(datasourceId, config);
- //获取数据集字段映射结构
- String schemeVersion = log.getSchemeVersion();
- String datasetId = log.getJobDatasetId();
- List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
- JSONArray colList = new JSONArray(colString);
- List<JSONObject> list = db.query(sql);
- String message = intoMongodb(list, schemeVersion, stdDatasetCode, colList);
- if (message.length() > 0 || db.errorMessage.length() > 0) {
- log.setJobStatus("0");
- log.setRepeatEndTime(new Date());
- if (message.length() > 0) {
- log.setRepeatJobContent(message);
- } else {
- db.errorMessage.length();
- }
- datacollectLogDao.updateEntity(log);
- return new ActionResult(false, "补采失败!");
- } else {
- log.setJobStatus("3");
- log.setRepeatEndTime(new Date());
- log.setRepeatJobContent("补采成功!");
- datacollectLogDao.updateEntity(log);
- return new ActionResult(true, "补采成功!");
- }
- }
- /**
- * 根据数据库类型获取时间sql
- *
- * @return
- */
- private String getDateSqlByDBType(DBType dbType, Date date) throws Exception {
- String val = DateConvert.toString(date, dateFormat);
- if (dbType.equals(DBType.Mysql)) {
- return "date_format(\'" + val + "\',\'" + dateFormat + "\')";
- } else if (dbType.equals(DBType.Oracle)) {
- return "to_date(\'" + val + "\',\'" + dateFormat + "\')";
- } else {
- return val;
- }
- }
- /**
- * 根据数据库类型获取转换数值型sql
- */
- private String getToNumberSqlByDBType(DBType dbType, String key) throws Exception {
- if (dbType.equals(DBType.Mysql)) {
- return "cast(" + key + " as signed integer)";
- } else if (dbType.equals(DBType.Oracle)) {
- return "to_number(" + key + ")";
- } else {
- return key;
- }
- }
- /**
- * 根据数据库类型获取分页sql
- *
- * @return
- */
- private String getPageSqlByDBType(DBType dbType, String sql, int start, int rows) throws Exception {
- if (dbType.equals(DBType.Mysql)) {
- return sql + " LIMIT " + start + "," + rows;
- } else if (dbType.equals(DBType.Oracle)) {
- return " select * from (select t.*,ROWNUM RSCOM_RN from (" + sql + ") t where ROWNUM<" + (start + rows + 1) + ") where RSCOM_RN>= " + (start + 1);
- } else {
- return sql;
- }
- }
- /**
- * 字典全转换成中文
- */
- private List<JSONObject> translateDictCN(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
- //获取字典列表
- List<DtoDictCol> dictColList = new ArrayList<>();
- for (int i = 0; i < colList.length(); i++) {
- JSONObject col = colList.getJSONObject(i);
- String dictId = col.optString("adapterDictId");
- if (dictId != null && dictId.length() > 0) {
- String dictType = col.optString("adapterDataType");
- String stdMetadataCode = col.optString("stdMetadataCode");
- DtoDictCol dictCol = new DtoDictCol();
- dictCol.setStdMetadataCode(stdMetadataCode);
- dictCol.setStdDictId(dictId);
- dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
- //获取字典数据
- List dictString = stdService.getDictByScheme(schemeVersion, dictId);
- JSONArray dictAdapterArray = new JSONArray(dictString);
- dictCol.setDictList(dictAdapterArray);
- dictColList.add(dictCol);
- }
- }
- //翻译列表
- for (JSONObject data : list) {
- //遍历字典字段
- for (DtoDictCol col : dictColList) {
- String colNmae = col.getStdMetadataCode();
- String oldValue = data.optString(colNmae);
- String newValue = translateDictValueCN(oldValue, col.getAdapterDataType(), col.getDictList());
- if (newValue != null && newValue.length() > 0) {
- data.put(colNmae, newValue);
- }
- }
- }
- return list;
- }
- /**
- * 转译字典成中文
- *
- * @return
- */
- private String translateDictValueCN(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
- if (type.equals("0")) //原本就是值
- {
- return oldValue;
- }
- //遍历字典数据(编码->名称)
- for (int i = 0; i < dictAdapterList.length(); i++) {
- JSONObject dictItem = dictAdapterList.getJSONObject(i);
- if (oldValue != null && dictItem.has("stdEntryCode")) {
- if (oldValue.equals(dictItem.getString("stdEntryCode"))) {
- String newValue = dictItem.getString("stdEntryValue"); //名称
- return newValue;
- }
- }
- }
- return oldValue;
- }
- /**
- * 字典转换
- *
- * @param list
- * @param colList
- * @return
- * @throws Exception
- */
- private List<JSONObject> translateDict(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
- //获取字典列表
- List<DtoDictCol> dictColList = new ArrayList<>();
- for (int i = 0; i < colList.length(); i++) {
- JSONObject col = colList.getJSONObject(i);
- String dictId = col.optString("adapterDictId");
- if (dictId != null && dictId.length() > 0) {
- String dictType = col.optString("adapterDataType");
- String stdMetadataCode = col.optString("stdMetadataCode");
- DtoDictCol dictCol = new DtoDictCol();
- dictCol.setStdMetadataCode(stdMetadataCode);
- dictCol.setStdDictId(dictId);
- dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
- //获取字典数据
- List dictString = stdService.getDictByScheme(schemeVersion, dictId);
- JSONArray dictAdapterArray = new JSONArray(dictString);
- dictCol.setDictList(dictAdapterArray);
- dictColList.add(dictCol);
- }
- }
- //翻译列表
- for (JSONObject data : list) {
- //遍历字典字段
- for (DtoDictCol col : dictColList) {
- String colNmae = col.getStdMetadataCode();
- String oldValue = data.optString(colNmae);
- String newValue = translateDictValue(oldValue, col.getAdapterDataType(), col.getDictList());
- if (newValue != null && newValue.length() > 0) {
- data.put(colNmae, newValue);
- }
- }
- }
- return list;
- }
- /**
- * 转译字典
- *
- * @return
- */
- private String translateDictValue(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
- //应用标准字段
- String colName = "adapterEntryCode";
- if (type.equals("0")) //通过name转译
- {
- colName = "adapterEntryValue";
- }
- //遍历字典数据
- for (int i = 0; i < dictAdapterList.length(); i++) {
- JSONObject dictItem = dictAdapterList.getJSONObject(i);
- if (oldValue != null && dictItem.has(colName)) {
- if (oldValue.equals(dictItem.getString(colName))) {
- String newValue = dictItem.getString("stdEntryCode");
- return newValue;
- }
- }
- }
- //找不到适配字典数据则返回空
- return "";
- }
- /**
- * 获取过滤条件
- *
- * @return
- */
- private String getCondition(DBType dbType, String conditionString) {
- JSONArray array = new JSONArray(conditionString);
- if (array != null && array.length() > 0) {
- List<QueryCondition> conditions = new ArrayList<>();
- for (Object item : array) {
- JSONObject obj = (JSONObject) item;
- String logical = obj.getString("andOr");
- String operation = obj.getString("condition");
- String field = obj.getString("field");
- String keyword = obj.getString("value");
- conditions.add(new QueryCondition(logical, operation, field, keyword));
- }
- //条件语句转换
- ParserSql ps;
- switch (dbType) {
- case Oracle:
- ps = new ParserOracle();
- break;
- case Sqlserver:
- ps = new ParserSqlserver();
- break;
- default:
- ps = new ParserMysql();
- }
- return ps.getConditionSql(conditions);
- }
- return "";
- }
- /**
- * 获取条件SQL
- *
- * @param dbType
- * @param conditionString
- * @return
- * @throws ParseException
- */
- private String getConditionSql(DBType dbType, String conditionString) throws ParseException {
- String conditionSql = "";
- JSONArray conditions = new JSONArray(conditionString);
- Iterator iterator = conditions.iterator();
- while (iterator.hasNext()) {
- JSONObject condition = (JSONObject) iterator.next();
- String logic = condition.getString("condition");
- String andOr = condition.getString("andOr");
- String field = condition.getString("field");
- String value = condition.getString("value");
- String fieldType = condition.getString("type");
- String keys = "";
- if (andOr.equals(" AND ")) {
- conditionSql = conditionSql + " and ";
- } else {
- conditionSql = conditionSql + " or ";
- }
- if (logic.equals(" IN ") || logic.equals(" NOT IN ")) {
- String[] keywords = value.split(",");
- for (String key : keywords) {
- keys += "'" + key + "',";
- }
- keys = " (" + keys.substring(0, keys.length() - 1) + ") ";
- } else if (logic.equals(" LIKE ")) {
- keys += " '%" + value + "%' ";
- } else {
- if (fieldType.equals("DATE")) {
- keys += getDateFormatSql(dbType, value);
- } else {
- keys += " '" + value + "' ";
- }
- }
- conditionSql += field + logic + keys;
- }
- return conditionSql;
- }
- /**
- * 获取对应数据库时间格式
- *
- * @param dbType
- * @param key
- * @return
- * @throws ParseException
- */
- private String getDateFormatSql(DBType dbType, String key) throws ParseException {
- String dateFormat = "yyyy-MM-dd HH:mm:ss";
- SimpleDateFormat formatDate = new SimpleDateFormat("yyyy-MM-dd");
- Date d = formatDate.parse(key);
- SimpleDateFormat format = new SimpleDateFormat(dateFormat);
- switch (dbType) {
- case Oracle:
- key = "to_date(\'" + format.format(d) + "\',\'YYYY-MM-DD HH24:MI:SS\')";
- break;
- case Sqlserver:
- break;
- default:
- key = "date_format(\'" + format.format(d) + "\',\'%y-%m-%d %T\')";
- }
- return key;
- }
- /**
- * 采集入库
- *
- * @return
- */
- private String intoMongodb(List<JSONObject> list, String schemeVersion, String stdDatasetCode, JSONArray colList) {
- String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
- String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
- PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
- if (patientIdentity != null) {
- patientIdCode = patientIdentity.getPatientIDCode();
- eventNoCode = patientIdentity.getEventNoCode();
- }
- try {
- if (!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
- return "Mongodb索引创建失败!(表:" + stdDatasetCode + ")";
- }
- if (list != null && list.size() > 0) {
- //字典未转换前采集到原始库
- boolean b = mongoOrigin.insert(stdDatasetCode, translateDictCN(list, colList, schemeVersion));
- //字典转换
- list = translateDict(list, colList, schemeVersion);
- //采集到mongodb
- b = mongo.insert(stdDatasetCode, list);
- if (!b) {
- if (mongo.errorMessage != null && mongo.errorMessage.length() > 0) {
- logger.debug(mongo.errorMessage);
- return mongo.errorMessage;
- } else {
- return "Mongodb保存失败!(表:" + stdDatasetCode + ")";
- }
- }
- }
- } catch (Exception e) {
- return e.getMessage();
- }
- return "";
- }
- /**
- * 数据库表采集
- *
- * @return
- */
- private String collectTable(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
- String message = "";
- String datasetId = ds.getJobDatasetId();
- String jobDatasetName = ds.getJobDatasetName();
- String condition = ds.getJobDatasetCondition();
- String key = ds.getJobDatasetKey();
- String keytype = ds.getJobDatasetKeytype();
- String keyvalue = ds.getJobDatasetKeyvalue();
- String orgCode = ds.getOrgCode();
- String datasourceId = ds.getDatasourceId();
- String config = ds.getConfig(); //数据库连接
- DBHelper db = new DBHelper(datasourceId, config);
- DBType dbType = db.dbType;
- //获取数据集映射
- List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
- JSONArray datasetList = new JSONArray(datasetString);
- if (datasetList != null && datasetList.length() > 0) {
- String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
- String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
- //获取数据集字段映射结构
- List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
- JSONArray colList = new JSONArray(colString);
- if (colList != null && colList.length() > 0) {
- //拼接查询sql
- String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
- for (int i = 0; i < colList.length(); i++) {
- JSONObject col = colList.getJSONObject(i);
- String adapterMetadataCode = col.optString("adapterMetadataCode");
- if (adapterMetadataCode.length() > 0) {
- strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
- }
- }
- strSql += " from " + adapterTableName;
- String strWhere = " where 1=1";
- //采集范围
- if (condition != null && condition.length() > 0) {
- strWhere += getConditionSql(dbType, condition);
- }
- //增量采集
- String maxKey = "0";
- if (key != null && key.length() > 0) {
- maxKey = key;
- if (keytype.toUpperCase().equals("DATE")) //时间类型
- {
- if (keyvalue != null && keyvalue.length() > 0) {
- Date keyDate = new Date();
- //字符串转时间
- keyDate = DateConvert.toDate(keyvalue);
- //根据数据库类型获取时间sql
- strWhere += " and " + maxKey + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
- }
- } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
- {
- maxKey = getToNumberSqlByDBType(dbType, key);
- if (keyvalue != null && keyvalue.length() > 0) {
- strWhere += " and " + maxKey + ">'" + keyvalue + "'";
- }
- } else {
- if (keyvalue != null && keyvalue.length() > 0) {
- strWhere += " and " + maxKey + ">'" + keyvalue + "'";
- }
- }
- strWhere += " order by " + maxKey;
- }
- strSql += strWhere;
- //总条数
- String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
- String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
- JSONObject objCount = db.load(sqlCount);
- if (objCount == null) {
- if (db.errorMessage.length() > 0) {
- throw new Exception(db.errorMessage);
- } else {
- throw new Exception("查询异常:" + sqlCount);
- }
- } else {
- int count = objCount.getInt("COUNT");
- if (count == 0) //0条记录,无需采集
- {
- message = "0条记录,无需采集。";
- } else {
- //获取最大值
- JSONObject objMax = db.load(sqlMax);
- int successCount = 0;
- String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
- //修改最大值
- if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
- datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
- logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
- }
- int countPage = 1;
- if (count > maxNum) //分页采集
- {
- countPage = count / maxNum + 1;
- }
- for (int i = 0; i < countPage; i++) {
- int rows = maxNum;
- if (i + 1 == countPage) {
- rows = count - i * maxNum;
- }
- String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
- RsJobLogDetail detail = new RsJobLogDetail();
- detail.setStartTime(new Date());
- detail.setJobLogId(logId);
- detail.setDatasourceId(datasourceId);
- detail.setConfig(config);
- detail.setStdDatasetCode(stdTableName);
- detail.setJobDatasetId(datasetId);
- detail.setJobDatasetName(ds.getJobDatasetName());
- detail.setJobId(ds.getJobId());
- detail.setJobSql(sql);
- detail.setJobNum(i + 1);
- detail.setJobDatasetRows(rows);
- detail.setSchemeVersion(schemeVersion);
- List<JSONObject> list = db.query(sql);
- String msg = "";
- if (list != null) {
- msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
- } else {
- if (db.errorMessage.length() > 0) {
- msg = db.errorMessage;
- } else {
- msg = "查询数据为空!";
- }
- }
- if (msg.length() > 0) {
- //任务日志细表异常操作
- detail.setJobStatus("0");
- detail.setJobContent(msg);
- logger.info(msg); //文本日志
- } else {
- detail.setJobStatus("1");
- detail.setJobContent("采集成功!");
- successCount += rows;
- }
- detail.setEndTime(new Date());
- datacollectLogDao.saveEntity(detail);
- }
- message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
- }
- }
- } else {
- throw new Exception(jobDatasetName + "数据集字段映射为空!");
- }
- } else {
- throw new Exception(jobDatasetName + "数据集映射为空!");
- }
- logger.info(message);
- return message;
- }
- /**
- * XML转JSONList
- *
- * @return
- */
- private List<JSONObject> getListFromXml(String xml) throws Exception {
- SAXReader reader = new SAXReader();
- Document doc = reader.read(new ByteArrayInputStream(xml.getBytes("UTF-8")));
- Element root = doc.getRootElement();
- List<JSONObject> re = new ArrayList<>();
- //xml数据列表
- Iterator iter = root.elementIterator("Data");
- while (iter.hasNext()) {
- JSONObject obj = new JSONObject();
- Element el = (Element) iter.next();
- Iterator cols = el.elementIterator();
- while (cols.hasNext()) {
- Element col = (Element) cols.next();
- obj.put(col.getName().toUpperCase(), col.getStringValue());
- }
- re.add(obj);
- }
- return re;
- }
- /**
- * webservice采集
- *
- * @return
- */
- private String collectWebservice(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
- String message = "";
- String datasetId = ds.getJobDatasetId();
- String jobDatasetName = ds.getJobDatasetName();
- String condition = ds.getJobDatasetCondition();
- String key = ds.getJobDatasetKey();
- String keytype = ds.getJobDatasetKeytype();
- String keyvalue = ds.getJobDatasetKeyvalue();
- String orgCode = ds.getOrgCode();
- String datasourceId = ds.getDatasourceId();
- String config = ds.getConfig(); //数据库连接
- DBType dbType = DBType.Oracle;//********** 先定死Oracle ****************************
- //webservice地址
- Map<String, String> mapConfig = objectMapper.readValue(config, Map.class);
- if (mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) {
- String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
- //获取数据集映射
- List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
- JSONArray datasetList = new JSONArray(datasetString);
- if (datasetList != null && datasetList.length() > 0) {
- String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
- String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
- //获取数据集字段映射结构
- List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
- JSONArray colList = new JSONArray(colString);
- if (colList != null && colList.length() > 0) {
- //拼接查询sql
- String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
- for (int i = 0; i < colList.length(); i++) {
- JSONObject col = colList.getJSONObject(i);
- String adapterMetadataCode = col.optString("adapterMetadataCode");
- if (adapterMetadataCode.length() > 0) {
- strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
- }
- }
- strSql += " from " + adapterTableName;
- String strWhere = " where 1=1";
- //采集范围
- if (condition != null && condition.length() > 0) {
- strWhere += getConditionSql(dbType, condition);
- }
- //增量采集
- String maxKey = "0";
- String keyValue = ds.getJobDatasetKeyvalue();
- if (key != null && key.length() > 0) {
- maxKey = key;
- if (keytype.toUpperCase().equals("DATE")) //时间类型
- {
- Date keyDate = new Date();
- if (keyvalue != null && keyvalue.length() > 0) {
- //字符串转时间
- keyDate = DateConvert.toDate(keyvalue);
- //根据数据库类型获取时间sql
- strWhere += " and " + key + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
- strWhere += " order by " + key;
- }
- } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
- {
- maxKey = getToNumberSqlByDBType(dbType, key);
- if (keyvalue != null && keyvalue.length() > 0) {
- strWhere += " and " + maxKey + ">'" + keyvalue + "'";
- strWhere += " order by " + maxKey;
- }
- } else {
- if (keyvalue != null && keyvalue.length() > 0) {
- strWhere += " and " + key + ">'" + keyvalue + "'";
- strWhere += " order by " + key;
- }
- }
- }
- strSql += strWhere;
- //总条数和最大值查询
- String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
- String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
- //webservice获取数据总条数
- String strCount = "";//WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlCount});
- List<JSONObject> dataCount = getListFromXml(strCount);
- if (dataCount != null && dataCount.size() > 0) {
- Integer count = Integer.parseInt(dataCount.get(0).getString("COUNT"));
- if (count == 0) //0条记录,无需采集
- {
- message = "0条记录,无需采集。";
- } else {
- //webservice获取最大值
- String strMax = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlMax});
- List<JSONObject> dataMax = getListFromXml(strCount);
- int successCount = 0;
- String maxKeyvalue = dataMax.get(0).getString("MAX_KEYVALUE");
- //修改最大值
- if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
- datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
- logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
- }
- int countPage = 1;
- if (count > maxNum) //分页采集
- {
- countPage = count / maxNum + 1;
- }
- for (int i = 0; i < countPage; i++) {
- int rows = maxNum;
- if (i + 1 == countPage) {
- rows = count - i * maxNum;
- }
- String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
- RsJobLogDetail detail = new RsJobLogDetail();
- detail.setStartTime(new Date());
- detail.setJobLogId(logId);
- detail.setDatasourceId(datasourceId);
- detail.setConfig(config);
- detail.setStdDatasetCode(stdTableName);
- detail.setJobDatasetId(datasetId);
- detail.setJobDatasetName(ds.getJobDatasetName());
- detail.setJobId(ds.getJobId());
- detail.setJobSql(sql);
- detail.setJobNum(i + 1);
- detail.setJobDatasetRows(rows);
- detail.setSchemeVersion(schemeVersion);
- String msg = "";
- try {
- //获取分页数据
- String strList = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sql});
- List<JSONObject> list = getListFromXml(strList);
- if (list != null) {
- msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
- } else {
- msg = "查询数据为空!";
- }
- if (msg.length() > 0) {
- //任务日志细表异常操作
- detail.setJobStatus("0");
- detail.setJobContent(msg);
- logger.info(msg); //文本日志
- } else {
- detail.setJobStatus("1");
- detail.setJobContent("采集成功!");
- successCount += rows;
- }
- } catch (Exception ex) {
- msg = ex.getMessage();
- }
- detail.setEndTime(new Date());
- datacollectLogDao.saveEntity(detail);
- }
- message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
- }
- }
- } else {
- throw new Exception(jobDatasetName + "数据集字段映射为空!");
- }
- } else {
- throw new Exception(jobDatasetName + "数据集映射为空!");
- }
- } else {
- throw new Exception("非法webservice路径!");
- }
- logger.info(message);
- return message;
- }
- /**
- * 采集入库(包含blob字段处理)
- * @return
- */
- private String intoMongodb2(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
- {
- String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
- String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
- PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
- if (patientIdentity != null) {
- patientIdCode = patientIdentity.getPatientIDCode();
- eventNoCode = patientIdentity.getEventNoCode();
- }
- try{
- if(!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
- return "Mongodb索引创建失败!(表:"+stdDatasetCode+")";
- }
- if(list!=null && list.size()>0)
- {
- //TODO TOSET 判断是否是非结构化数据集
- if ("unstructured".equals(stdDatasetCode)){
- for (JSONObject jsonObject:list) {
- //文件内容保存到GridFS,细表内容字段保存为文件objctId
- Blob blob = (Blob) jsonObject.get("CONTENT");
- String type = (String) jsonObject.get("FILE_TYPE");
- String patientId= (String) jsonObject.get("patient_id");
- String eventNo= (String) jsonObject.get("event_no");
- Map<String,Object> params = new HashMap<>();
- params.put("patient_id",patientId);
- params.put("event_no",eventNo);
- try {
- ObjectId objectId = GridFSUtil.uploadFile("files", blob, type, params);
- jsonObject.put("CONTENT", objectId);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- //字典未转换前采集到原始库
- boolean b = mongoOrigin.insert(stdDatasetCode,translateDictCN(list, colList,schemeVersion));
- //字典转换
- list = translateDict(list, colList,schemeVersion);
- //采集到mongodb
- b = mongo.insert(stdDatasetCode,list);
- if(!b)
- {
- if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
- {
- System.out.print(mongo.errorMessage);
- return mongo.errorMessage;
- }
- else {
- return "Mongodb保存失败!(表:"+stdDatasetCode+")";
- }
- }
- }
- }
- catch (Exception e)
- {
- return e.getMessage();
- }
- return "";
- }
- /**
- * 数据库采集(包含Blob类型数据)
- * @param ds
- * @param schemeVersion
- * @param logId
- * @return
- * @throws Exception
- */
- private String collectBlobTable(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
- {
- String message = "";
- String datasetId = ds.getJobDatasetId();
- String jobDatasetName = ds.getJobDatasetName();
- String condition=ds.getJobDatasetCondition();
- String key=ds.getJobDatasetKey();
- String keytype=ds.getJobDatasetKeytype();
- String keyvalue=ds.getJobDatasetKeyvalue();
- String orgCode = ds.getOrgCode();
- String datasourceId = ds.getDatasourceId();
- String config = ds.getConfig(); //数据库连接
- DBHelper db = new DBHelper(datasourceId,config);
- DBType dbType = db.dbType;
- //获取数据集映射
- List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
- JSONArray datasetList = new JSONArray(datasetString);
- if(datasetList!=null &&datasetList.length()>0)
- {
- String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
- String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
- //获取数据集字段映射结构
- List colString = stdService.getDatacolByScheme(schemeVersion,datasetId);
- JSONArray colList = new JSONArray(colString);
- if(colList!=null && colList.length()>0)
- {
- //拼接查询sql
- String strSql = "Select '" + orgCode +"' as RSCOM_ORG_CODE";
- for(int i=0; i< colList.length();i++)
- {
- JSONObject col = colList.getJSONObject(i);
- String adapterMetadataCode = col.optString("adapterMetadataCode");
- if(adapterMetadataCode.length()>0)
- {
- strSql+= ","+adapterMetadataCode +" as " + col.optString("stdMetadataCode") ;
- }
- }
- strSql += " from " +adapterTableName;
- String strWhere = " where 1=1";
- //采集范围
- if(condition!=null && condition.length()>0)
- {
- strWhere += getConditionSql(dbType,condition);
- }
- //增量采集
- String maxKey = "0";
- if(key!=null && key.length()>0)
- {
- maxKey = key;
- if(keytype.toUpperCase().equals("DATE")) //时间类型
- {
- if(keyvalue!=null && keyvalue.length()>0) {
- Date keyDate = new Date();
- //字符串转时间
- keyDate = DateConvert.toDate(keyvalue);
- //根据数据库类型获取时间sql
- strWhere += " and "+ maxKey + ">'"+getDateSqlByDBType(dbType,keyDate)+"'";
- }
- }
- else if(keytype.toUpperCase().equals("VARCHAR")) //字符串类型
- {
- maxKey = getToNumberSqlByDBType(dbType,key);
- if(keyvalue!=null && keyvalue.length()>0) {
- strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
- }
- }
- else{
- if(keyvalue!=null && keyvalue.length()>0) {
- strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
- }
- }
- strWhere += " order by " + maxKey;
- }
- strSql += strWhere;
- //总条数
- String sqlCount = "select count(1) as COUNT from (" + strSql+")";
- String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
- JSONObject objCount = db.load(sqlCount);
- if(objCount==null)
- {
- if(db.errorMessage.length()>0)
- {
- throw new Exception(db.errorMessage);
- }
- else{
- throw new Exception("查询异常:"+sqlCount);
- }
- }
- else{
- int count = objCount.getInt("COUNT");
- if(count==0) //0条记录,无需采集
- {
- message = "0条记录,无需采集。";
- }
- else
- {
- //获取最大值
- JSONObject objMax = db.load(sqlMax);
- int successCount = 0;
- String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
- //修改最大值
- if(maxKeyvalue!=null&& maxKeyvalue.length()>0)
- {
- datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(),maxKeyvalue);
- logger.info("修改任务数据集最大值为"+maxKeyvalue+"。"); //文本日志
- }
- int countPage = 1;
- if(count > maxNum) //分页采集
- {
- countPage = count/maxNum+1;
- }
- for(int i=0;i<countPage;i++)
- {
- int rows = maxNum;
- if(i+1==countPage){
- rows = count-i*maxNum;
- }
- String sql = getPageSqlByDBType(dbType,strSql,i*maxNum,rows); //获取分页sql语句
- RsJobLogDetail detail = new RsJobLogDetail();
- detail.setStartTime(new Date());
- detail.setJobLogId(logId);
- detail.setDatasourceId(datasourceId);
- detail.setConfig(config);
- detail.setStdDatasetCode(stdTableName);
- detail.setJobDatasetId(datasetId);
- detail.setJobDatasetName(ds.getJobDatasetName());
- detail.setJobId(ds.getJobId());
- detail.setJobSql(sql);
- detail.setJobNum(i+1);
- detail.setJobDatasetRows(rows);
- detail.setSchemeVersion(schemeVersion);
- List<JSONObject> list = db.query(sql);
- String msg = "";
- if(list!=null)
- {
- msg = intoMongodb2(list,schemeVersion,stdTableName,colList); //返回信息
- }
- else{
- if(db.errorMessage.length()>0)
- {
- msg = db.errorMessage;
- }
- else{
- msg = "查询数据为空!";
- }
- }
- if(msg.length()>0)
- {
- //任务日志细表异常操作
- detail.setJobStatus("0");
- detail.setJobContent(msg);
- }
- else{
- detail.setJobStatus("1");
- detail.setJobContent("采集成功!");
- successCount += rows;
- }
- detail.setEndTime(new Date());
- datacollectLogDao.saveEntity(detail);
- }
- message = jobDatasetName + "采集成功"+successCount+"条数据,总条数"+count+"条。";
- }
- }
- }
- else
- {
- throw new Exception(jobDatasetName + "数据集字段映射为空!");
- }
- }
- else{
- throw new Exception(jobDatasetName + "数据集映射为空!");
- }
- return message;
- }
-
- }
|