DatacollectService.java 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209
  1. package com.yihu.hos.datacollect.service;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.yihu.ehr.dbhelper.common.QueryCondition;
  4. import com.yihu.ehr.dbhelper.common.enums.DBType;
  5. import com.yihu.ehr.dbhelper.common.sqlparser.ParserMysql;
  6. import com.yihu.ehr.dbhelper.common.sqlparser.ParserOracle;
  7. import com.yihu.ehr.dbhelper.common.sqlparser.ParserSql;
  8. import com.yihu.ehr.dbhelper.common.sqlparser.ParserSqlserver;
  9. import com.yihu.ehr.dbhelper.jdbc.DBHelper;
  10. import com.yihu.ehr.dbhelper.mongodb.MongodbHelper;
  11. import com.yihu.hos.common.Services;
  12. import com.yihu.hos.core.log.Logger;
  13. import com.yihu.hos.core.log.LoggerFactory;
  14. import com.yihu.hos.crawler.model.config.SysConfig;
  15. import com.yihu.hos.crawler.model.patient.PatientIdentity;
  16. import com.yihu.hos.datacollect.dao.intf.IDatacollectDao;
  17. import com.yihu.hos.datacollect.dao.intf.IDatacollectLogDao;
  18. import com.yihu.hos.datacollect.model.*;
  19. import com.yihu.hos.datacollect.service.intf.IDatacollectManager;
  20. import com.yihu.hos.datacollect.service.intf.IDatacollectService;
  21. import com.yihu.hos.resource.service.IStdService;
  22. import com.yihu.hos.web.framework.constant.DateConvert;
  23. import com.yihu.hos.web.framework.constant.SqlConstants;
  24. import com.yihu.hos.web.framework.model.ActionResult;
  25. import com.yihu.hos.web.framework.util.GridFSUtil;
  26. import org.bson.types.ObjectId;
  27. import org.dom4j.Document;
  28. import org.dom4j.Element;
  29. import org.dom4j.io.SAXReader;
  30. import org.json.JSONArray;
  31. import org.json.JSONObject;
  32. import org.springframework.beans.factory.annotation.Autowired;
  33. import org.springframework.stereotype.Service;
  34. import org.springframework.transaction.annotation.Transactional;
  35. import javax.annotation.Resource;
  36. import java.io.ByteArrayInputStream;
  37. import java.sql.Blob;
  38. import java.text.ParseException;
  39. import java.text.SimpleDateFormat;
  40. import java.util.*;
  41. /**
  42. * 数据采集执行服务
  43. */
  44. @Service(Services.DatacollectService)
  45. public class DatacollectService implements IDatacollectService {
  46. static private final Logger logger = LoggerFactory.getLogger(DatacollectService.class);
  47. MongodbHelper mongoOrigin = new MongodbHelper("origin");
  48. MongodbHelper mongo = new MongodbHelper();
  49. String dateFormat = "yyyy-MM-dd HH:mm:ss"; //默认时间字符串格式
  50. int maxNum = 1000; //查询条数限制
  51. @Resource(name = Services.Datacollect)
  52. private IDatacollectManager datacollect;
  53. @Resource(name = Services.StdService)
  54. private IStdService stdService;
  55. @Resource(name = "DatacollectDao")
  56. private IDatacollectDao datacollectDao;
  57. @Resource(name = "DatacollectLogDao")
  58. private IDatacollectLogDao datacollectLogDao;
  59. @Autowired
  60. private ObjectMapper objectMapper;
  61. /**
  62. * 根据连接字符串获取数据库类型
  63. */
  64. private static DBType getDbType(String uri) {
  65. return uri.startsWith("jdbc:mysql") ? DBType.Mysql : (uri.startsWith("jdbc:oracle") ? DBType.Oracle : (uri.startsWith("jdbc:hive2") ? DBType.Hive : (uri.startsWith("jdbc:microsoft:sqlserver") ? DBType.Sqlserver : DBType.Mysql)));
  66. }
  67. // public static void main(String[] args) throws Exception {
  68. // //namespace是命名空间,methodName是方法名
  69. // String sql = "select count(1) as COUNT,max(to_number(HDSD03_01_031)) as MAX_KEYVALUE from HDSC01_02 where 1=1 order by to_number(HDSD03_01_031)";
  70. // //调用web Service//输出调用结果
  71. // System.out.println(WebserviceUtil.request("http://172.19.103.71:8080/service/sql?wsdl", "ExcuteSQL", new Object[]{"", sql}));
  72. //
  73. // }
  74. /**
  75. * 执行任务
  76. */
  77. @Override
  78. public void executeJob(String jobId) throws Exception {
  79. //获取任务详细信息
  80. RsJobConfig job = datacollect.getJobById(jobId);
  81. RsJobLog log = new RsJobLog();
  82. log.setJobId(jobId);
  83. log.setJobStartTime(new Date());
  84. datacollectLogDao.saveEntity(log);
  85. String logId = log.getId();
  86. logger.info("任务" + jobId + "开始采集,新增日志" + logId + "。");
  87. StringBuilder logStr = new StringBuilder();
  88. int count = 0;
  89. int success = 0;
  90. try {
  91. String schemeVersion = job.getSchemeVersion();
  92. //获取任务相关数据集
  93. List<DtoJobDataset> list = datacollectDao.getDatacollectDataset(jobId);
  94. logger.info("获取任务相关数据集,数量" + list.size() + "。");
  95. if (list != null && list.size() > 0) {
  96. count = list.size();
  97. logStr.append("/*********** 开始采集 *******************/\n");
  98. //遍历数据集
  99. for (DtoJobDataset ds : list) {
  100. try {
  101. String type = ds.getType();
  102. String message = "";
  103. logStr.append(DateConvert.toString(new Date(), dateFormat) + " " + ds.getJobDatasetName());
  104. if (type != null) {
  105. if (type.equals("1")) //Web Service
  106. {
  107. message = collectWebservice(ds, schemeVersion, logId) + "\n";
  108. } else if (type.equals("2"))//文件系统
  109. {
  110. message = "文件系统采集。\n";
  111. } else { //数据库
  112. message = collectTable(ds, schemeVersion, logId) + "\n";
  113. }
  114. } else {
  115. message = ds.getJobDatasetName() + "未关联数据源!\n";
  116. }
  117. logger.info(message); //文本日志
  118. logStr.append(message);
  119. success++;
  120. } catch (Exception ex) {
  121. logger.info("异常:" + ex.getMessage());
  122. logStr.append(ex.getMessage() + "\n");
  123. }
  124. }
  125. logStr.append("/*********** 结束采集 *******************/\n");
  126. }
  127. } catch (Exception ex) {
  128. ex.printStackTrace();
  129. logger.info("异常:" + ex.getMessage());
  130. logStr.append(ex.getMessage() + "\n");
  131. logStr.append("/*********** 出现异常,中断采集 *******************/\n");
  132. }
  133. //任务主日志成功
  134. String jobContent = logStr.toString().replace("\"", "\\\"");
  135. if (jobContent.length() > 4000) {
  136. jobContent = jobContent.substring(0, 4000);
  137. }
  138. log.setJobContent(jobContent);
  139. log.setJobEndTime(new Date());
  140. log.setJobDatasetCount(count);
  141. log.setJobDatasetSuccess(success);
  142. logger.info("任务结束," + count + "个数据集成功采集" + success + "个。");
  143. datacollectLogDao.updateEntity(log);
  144. }
  145. /**
  146. * 根据日志详细补采数据
  147. */
  148. @Override
  149. @Transactional
  150. public ActionResult repeatJob(String id) throws Exception {
  151. RsJobLogDetail log = datacollectLogDao.getEntity(RsJobLogDetail.class, id);
  152. if (log.getJobStatus().equals("2")) {
  153. return new ActionResult(false, "数据补采中!");
  154. }
  155. if (!log.getJobStatus().equals("0")) {
  156. return new ActionResult(false, "数据无需补采!");
  157. }
  158. try {
  159. log.setRepeatStartTime(new Date());
  160. log.setJobStatus("2"); //设置采集中状态
  161. datacollectLogDao.updateEntity(log);
  162. } catch (Exception e) {
  163. return new ActionResult(false, "补采失败!");
  164. }
  165. log.setJobStatus("0");
  166. datacollectLogDao.updateEntity(log);
  167. String stdDatasetCode = log.getStdDatasetCode();
  168. String sql = log.getJobSql();
  169. //数据库连接
  170. String datasourceId = log.getDatasourceId();
  171. String config = log.getConfig();
  172. DBHelper db = new DBHelper(datasourceId, config);
  173. //获取数据集字段映射结构
  174. String schemeVersion = log.getSchemeVersion();
  175. String datasetId = log.getJobDatasetId();
  176. List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
  177. JSONArray colList = new JSONArray(colString);
  178. List<JSONObject> list = db.query(sql);
  179. String message = intoMongodb(list, schemeVersion, stdDatasetCode, colList);
  180. if (message.length() > 0 || db.errorMessage.length() > 0) {
  181. log.setJobStatus("0");
  182. log.setRepeatEndTime(new Date());
  183. if (message.length() > 0) {
  184. log.setRepeatJobContent(message);
  185. } else {
  186. db.errorMessage.length();
  187. }
  188. datacollectLogDao.updateEntity(log);
  189. return new ActionResult(false, "补采失败!");
  190. } else {
  191. log.setJobStatus("3");
  192. log.setRepeatEndTime(new Date());
  193. log.setRepeatJobContent("补采成功!");
  194. datacollectLogDao.updateEntity(log);
  195. return new ActionResult(true, "补采成功!");
  196. }
  197. }
  198. /**
  199. * 根据数据库类型获取时间sql
  200. *
  201. * @return
  202. */
  203. private String getDateSqlByDBType(DBType dbType, Date date) throws Exception {
  204. String val = DateConvert.toString(date, dateFormat);
  205. if (dbType.equals(DBType.Mysql)) {
  206. return "date_format(\'" + val + "\',\'" + dateFormat + "\')";
  207. } else if (dbType.equals(DBType.Oracle)) {
  208. return "to_date(\'" + val + "\',\'" + dateFormat + "\')";
  209. } else {
  210. return val;
  211. }
  212. }
  213. /**
  214. * 根据数据库类型获取转换数值型sql
  215. */
  216. private String getToNumberSqlByDBType(DBType dbType, String key) throws Exception {
  217. if (dbType.equals(DBType.Mysql)) {
  218. return "cast(" + key + " as signed integer)";
  219. } else if (dbType.equals(DBType.Oracle)) {
  220. return "to_number(" + key + ")";
  221. } else {
  222. return key;
  223. }
  224. }
  225. /**
  226. * 根据数据库类型获取分页sql
  227. *
  228. * @return
  229. */
  230. private String getPageSqlByDBType(DBType dbType, String sql, int start, int rows) throws Exception {
  231. if (dbType.equals(DBType.Mysql)) {
  232. return sql + " LIMIT " + start + "," + rows;
  233. } else if (dbType.equals(DBType.Oracle)) {
  234. return " select * from (select t.*,ROWNUM RSCOM_RN from (" + sql + ") t where ROWNUM<" + (start + rows + 1) + ") where RSCOM_RN>= " + (start + 1);
  235. } else {
  236. return sql;
  237. }
  238. }
  239. /**
  240. * 字典全转换成中文
  241. */
  242. private List<JSONObject> translateDictCN(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
  243. //获取字典列表
  244. List<DtoDictCol> dictColList = new ArrayList<>();
  245. for (int i = 0; i < colList.length(); i++) {
  246. JSONObject col = colList.getJSONObject(i);
  247. String dictId = col.optString("adapterDictId");
  248. if (dictId != null && dictId.length() > 0) {
  249. String dictType = col.optString("adapterDataType");
  250. String stdMetadataCode = col.optString("stdMetadataCode");
  251. DtoDictCol dictCol = new DtoDictCol();
  252. dictCol.setStdMetadataCode(stdMetadataCode);
  253. dictCol.setStdDictId(dictId);
  254. dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
  255. //获取字典数据
  256. List dictString = stdService.getDictByScheme(schemeVersion, dictId);
  257. JSONArray dictAdapterArray = new JSONArray(dictString);
  258. dictCol.setDictList(dictAdapterArray);
  259. dictColList.add(dictCol);
  260. }
  261. }
  262. //翻译列表
  263. for (JSONObject data : list) {
  264. //遍历字典字段
  265. for (DtoDictCol col : dictColList) {
  266. String colNmae = col.getStdMetadataCode();
  267. String oldValue = data.optString(colNmae);
  268. String newValue = translateDictValueCN(oldValue, col.getAdapterDataType(), col.getDictList());
  269. if (newValue != null && newValue.length() > 0) {
  270. data.put(colNmae, newValue);
  271. }
  272. }
  273. }
  274. return list;
  275. }
  276. /**
  277. * 转译字典成中文
  278. *
  279. * @return
  280. */
  281. private String translateDictValueCN(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
  282. if (type.equals("0")) //原本就是值
  283. {
  284. return oldValue;
  285. }
  286. //遍历字典数据(编码->名称)
  287. for (int i = 0; i < dictAdapterList.length(); i++) {
  288. JSONObject dictItem = dictAdapterList.getJSONObject(i);
  289. if (oldValue != null && dictItem.has("stdEntryCode")) {
  290. if (oldValue.equals(dictItem.getString("stdEntryCode"))) {
  291. String newValue = dictItem.getString("stdEntryValue"); //名称
  292. return newValue;
  293. }
  294. }
  295. }
  296. return oldValue;
  297. }
  298. /**
  299. * 字典转换
  300. *
  301. * @param list
  302. * @param colList
  303. * @return
  304. * @throws Exception
  305. */
  306. private List<JSONObject> translateDict(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
  307. //获取字典列表
  308. List<DtoDictCol> dictColList = new ArrayList<>();
  309. for (int i = 0; i < colList.length(); i++) {
  310. JSONObject col = colList.getJSONObject(i);
  311. String dictId = col.optString("adapterDictId");
  312. if (dictId != null && dictId.length() > 0) {
  313. String dictType = col.optString("adapterDataType");
  314. String stdMetadataCode = col.optString("stdMetadataCode");
  315. DtoDictCol dictCol = new DtoDictCol();
  316. dictCol.setStdMetadataCode(stdMetadataCode);
  317. dictCol.setStdDictId(dictId);
  318. dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
  319. //获取字典数据
  320. List dictString = stdService.getDictByScheme(schemeVersion, dictId);
  321. JSONArray dictAdapterArray = new JSONArray(dictString);
  322. dictCol.setDictList(dictAdapterArray);
  323. dictColList.add(dictCol);
  324. }
  325. }
  326. //翻译列表
  327. for (JSONObject data : list) {
  328. //遍历字典字段
  329. for (DtoDictCol col : dictColList) {
  330. String colNmae = col.getStdMetadataCode();
  331. String oldValue = data.optString(colNmae);
  332. String newValue = translateDictValue(oldValue, col.getAdapterDataType(), col.getDictList());
  333. if (newValue != null && newValue.length() > 0) {
  334. data.put(colNmae, newValue);
  335. }
  336. }
  337. }
  338. return list;
  339. }
  340. /**
  341. * 转译字典
  342. *
  343. * @return
  344. */
  345. private String translateDictValue(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
  346. //应用标准字段
  347. String colName = "adapterEntryCode";
  348. if (type.equals("0")) //通过name转译
  349. {
  350. colName = "adapterEntryValue";
  351. }
  352. //遍历字典数据
  353. for (int i = 0; i < dictAdapterList.length(); i++) {
  354. JSONObject dictItem = dictAdapterList.getJSONObject(i);
  355. if (oldValue != null && dictItem.has(colName)) {
  356. if (oldValue.equals(dictItem.getString(colName))) {
  357. String newValue = dictItem.getString("stdEntryCode");
  358. return newValue;
  359. }
  360. }
  361. }
  362. //找不到适配字典数据则返回空
  363. return "";
  364. }
  365. /**
  366. * 获取过滤条件
  367. *
  368. * @return
  369. */
  370. private String getCondition(DBType dbType, String conditionString) {
  371. JSONArray array = new JSONArray(conditionString);
  372. if (array != null && array.length() > 0) {
  373. List<QueryCondition> conditions = new ArrayList<>();
  374. for (Object item : array) {
  375. JSONObject obj = (JSONObject) item;
  376. String logical = obj.getString("andOr");
  377. String operation = obj.getString("condition");
  378. String field = obj.getString("field");
  379. String keyword = obj.getString("value");
  380. conditions.add(new QueryCondition(logical, operation, field, keyword));
  381. }
  382. //条件语句转换
  383. ParserSql ps;
  384. switch (dbType) {
  385. case Oracle:
  386. ps = new ParserOracle();
  387. break;
  388. case Sqlserver:
  389. ps = new ParserSqlserver();
  390. break;
  391. default:
  392. ps = new ParserMysql();
  393. }
  394. return ps.getConditionSql(conditions);
  395. }
  396. return "";
  397. }
  398. /**
  399. * 获取条件SQL
  400. *
  401. * @param dbType
  402. * @param conditionString
  403. * @return
  404. * @throws ParseException
  405. */
  406. private String getConditionSql(DBType dbType, String conditionString) throws ParseException {
  407. String conditionSql = "";
  408. JSONArray conditions = new JSONArray(conditionString);
  409. Iterator iterator = conditions.iterator();
  410. while (iterator.hasNext()) {
  411. JSONObject condition = (JSONObject) iterator.next();
  412. String logic = condition.getString("condition");
  413. String andOr = condition.getString("andOr");
  414. String field = condition.getString("field");
  415. String value = condition.getString("value");
  416. String fieldType = condition.getString("type");
  417. String keys = "";
  418. if (andOr.equals(" AND ")) {
  419. conditionSql = conditionSql + " and ";
  420. } else {
  421. conditionSql = conditionSql + " or ";
  422. }
  423. if (logic.equals(" IN ") || logic.equals(" NOT IN ")) {
  424. String[] keywords = value.split(",");
  425. for (String key : keywords) {
  426. keys += "'" + key + "',";
  427. }
  428. keys = " (" + keys.substring(0, keys.length() - 1) + ") ";
  429. } else if (logic.equals(" LIKE ")) {
  430. keys += " '%" + value + "%' ";
  431. } else {
  432. if (fieldType.equals("DATE")) {
  433. keys += getDateFormatSql(dbType, value);
  434. } else {
  435. keys += " '" + value + "' ";
  436. }
  437. }
  438. conditionSql += field + logic + keys;
  439. }
  440. return conditionSql;
  441. }
  442. /**
  443. * 获取对应数据库时间格式
  444. *
  445. * @param dbType
  446. * @param key
  447. * @return
  448. * @throws ParseException
  449. */
  450. private String getDateFormatSql(DBType dbType, String key) throws ParseException {
  451. String dateFormat = "yyyy-MM-dd HH:mm:ss";
  452. SimpleDateFormat formatDate = new SimpleDateFormat("yyyy-MM-dd");
  453. Date d = formatDate.parse(key);
  454. SimpleDateFormat format = new SimpleDateFormat(dateFormat);
  455. switch (dbType) {
  456. case Oracle:
  457. key = "to_date(\'" + format.format(d) + "\',\'YYYY-MM-DD HH24:MI:SS\')";
  458. break;
  459. case Sqlserver:
  460. break;
  461. default:
  462. key = "date_format(\'" + format.format(d) + "\',\'%y-%m-%d %T\')";
  463. }
  464. return key;
  465. }
  466. /**
  467. * 采集入库
  468. *
  469. * @return
  470. */
  471. private String intoMongodb(List<JSONObject> list, String schemeVersion, String stdDatasetCode, JSONArray colList) {
  472. String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
  473. String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
  474. PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
  475. if (patientIdentity != null) {
  476. patientIdCode = patientIdentity.getPatientIDCode();
  477. eventNoCode = patientIdentity.getEventNoCode();
  478. }
  479. try {
  480. if (!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
  481. return "Mongodb索引创建失败!(表:" + stdDatasetCode + ")";
  482. }
  483. if (list != null && list.size() > 0) {
  484. //字典未转换前采集到原始库
  485. boolean b = mongoOrigin.insert(stdDatasetCode, translateDictCN(list, colList, schemeVersion));
  486. //字典转换
  487. list = translateDict(list, colList, schemeVersion);
  488. //采集到mongodb
  489. b = mongo.insert(stdDatasetCode, list);
  490. if (!b) {
  491. if (mongo.errorMessage != null && mongo.errorMessage.length() > 0) {
  492. logger.debug(mongo.errorMessage);
  493. return mongo.errorMessage;
  494. } else {
  495. return "Mongodb保存失败!(表:" + stdDatasetCode + ")";
  496. }
  497. }
  498. }
  499. } catch (Exception e) {
  500. return e.getMessage();
  501. }
  502. return "";
  503. }
  504. /**
  505. * 数据库表采集
  506. *
  507. * @return
  508. */
  509. private String collectTable(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
  510. String message = "";
  511. String datasetId = ds.getJobDatasetId();
  512. String jobDatasetName = ds.getJobDatasetName();
  513. String condition = ds.getJobDatasetCondition();
  514. String key = ds.getJobDatasetKey();
  515. String keytype = ds.getJobDatasetKeytype();
  516. String keyvalue = ds.getJobDatasetKeyvalue();
  517. String orgCode = ds.getOrgCode();
  518. String datasourceId = ds.getDatasourceId();
  519. String config = ds.getConfig(); //数据库连接
  520. DBHelper db = new DBHelper(datasourceId, config);
  521. DBType dbType = db.dbType;
  522. //获取数据集映射
  523. List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
  524. JSONArray datasetList = new JSONArray(datasetString);
  525. if (datasetList != null && datasetList.length() > 0) {
  526. String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
  527. String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
  528. //获取数据集字段映射结构
  529. List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
  530. JSONArray colList = new JSONArray(colString);
  531. if (colList != null && colList.length() > 0) {
  532. //拼接查询sql
  533. String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
  534. for (int i = 0; i < colList.length(); i++) {
  535. JSONObject col = colList.getJSONObject(i);
  536. String adapterMetadataCode = col.optString("adapterMetadataCode");
  537. if (adapterMetadataCode.length() > 0) {
  538. strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
  539. }
  540. }
  541. strSql += " from " + adapterTableName;
  542. String strWhere = " where 1=1";
  543. //采集范围
  544. if (condition != null && condition.length() > 0) {
  545. strWhere += getConditionSql(dbType, condition);
  546. }
  547. //增量采集
  548. String maxKey = "0";
  549. if (key != null && key.length() > 0) {
  550. maxKey = key;
  551. if (keytype.toUpperCase().equals("DATE")) //时间类型
  552. {
  553. if (keyvalue != null && keyvalue.length() > 0) {
  554. Date keyDate = new Date();
  555. //字符串转时间
  556. keyDate = DateConvert.toDate(keyvalue);
  557. //根据数据库类型获取时间sql
  558. strWhere += " and " + maxKey + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
  559. }
  560. } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
  561. {
  562. maxKey = getToNumberSqlByDBType(dbType, key);
  563. if (keyvalue != null && keyvalue.length() > 0) {
  564. strWhere += " and " + maxKey + ">'" + keyvalue + "'";
  565. }
  566. } else {
  567. if (keyvalue != null && keyvalue.length() > 0) {
  568. strWhere += " and " + maxKey + ">'" + keyvalue + "'";
  569. }
  570. }
  571. strWhere += " order by " + maxKey;
  572. }
  573. strSql += strWhere;
  574. //总条数
  575. String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
  576. String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
  577. JSONObject objCount = db.load(sqlCount);
  578. if (objCount == null) {
  579. if (db.errorMessage.length() > 0) {
  580. throw new Exception(db.errorMessage);
  581. } else {
  582. throw new Exception("查询异常:" + sqlCount);
  583. }
  584. } else {
  585. int count = objCount.getInt("COUNT");
  586. if (count == 0) //0条记录,无需采集
  587. {
  588. message = "0条记录,无需采集。";
  589. } else {
  590. //获取最大值
  591. JSONObject objMax = db.load(sqlMax);
  592. int successCount = 0;
  593. String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
  594. //修改最大值
  595. if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
  596. datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
  597. logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
  598. }
  599. int countPage = 1;
  600. if (count > maxNum) //分页采集
  601. {
  602. countPage = count / maxNum + 1;
  603. }
  604. for (int i = 0; i < countPage; i++) {
  605. int rows = maxNum;
  606. if (i + 1 == countPage) {
  607. rows = count - i * maxNum;
  608. }
  609. String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
  610. RsJobLogDetail detail = new RsJobLogDetail();
  611. detail.setStartTime(new Date());
  612. detail.setJobLogId(logId);
  613. detail.setDatasourceId(datasourceId);
  614. detail.setConfig(config);
  615. detail.setStdDatasetCode(stdTableName);
  616. detail.setJobDatasetId(datasetId);
  617. detail.setJobDatasetName(ds.getJobDatasetName());
  618. detail.setJobId(ds.getJobId());
  619. detail.setJobSql(sql);
  620. detail.setJobNum(i + 1);
  621. detail.setJobDatasetRows(rows);
  622. detail.setSchemeVersion(schemeVersion);
  623. List<JSONObject> list = db.query(sql);
  624. String msg = "";
  625. if (list != null) {
  626. msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
  627. } else {
  628. if (db.errorMessage.length() > 0) {
  629. msg = db.errorMessage;
  630. } else {
  631. msg = "查询数据为空!";
  632. }
  633. }
  634. if (msg.length() > 0) {
  635. //任务日志细表异常操作
  636. detail.setJobStatus("0");
  637. detail.setJobContent(msg);
  638. logger.info(msg); //文本日志
  639. } else {
  640. detail.setJobStatus("1");
  641. detail.setJobContent("采集成功!");
  642. successCount += rows;
  643. }
  644. detail.setEndTime(new Date());
  645. datacollectLogDao.saveEntity(detail);
  646. }
  647. message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
  648. }
  649. }
  650. } else {
  651. throw new Exception(jobDatasetName + "数据集字段映射为空!");
  652. }
  653. } else {
  654. throw new Exception(jobDatasetName + "数据集映射为空!");
  655. }
  656. logger.info(message);
  657. return message;
  658. }
  659. /**
  660. * XML转JSONList
  661. *
  662. * @return
  663. */
  664. private List<JSONObject> getListFromXml(String xml) throws Exception {
  665. SAXReader reader = new SAXReader();
  666. Document doc = reader.read(new ByteArrayInputStream(xml.getBytes("UTF-8")));
  667. Element root = doc.getRootElement();
  668. List<JSONObject> re = new ArrayList<>();
  669. //xml数据列表
  670. Iterator iter = root.elementIterator("Data");
  671. while (iter.hasNext()) {
  672. JSONObject obj = new JSONObject();
  673. Element el = (Element) iter.next();
  674. Iterator cols = el.elementIterator();
  675. while (cols.hasNext()) {
  676. Element col = (Element) cols.next();
  677. obj.put(col.getName().toUpperCase(), col.getStringValue());
  678. }
  679. re.add(obj);
  680. }
  681. return re;
  682. }
  683. /**
  684. * webservice采集
  685. *
  686. * @return
  687. */
  688. private String collectWebservice(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
  689. String message = "";
  690. String datasetId = ds.getJobDatasetId();
  691. String jobDatasetName = ds.getJobDatasetName();
  692. String condition = ds.getJobDatasetCondition();
  693. String key = ds.getJobDatasetKey();
  694. String keytype = ds.getJobDatasetKeytype();
  695. String keyvalue = ds.getJobDatasetKeyvalue();
  696. String orgCode = ds.getOrgCode();
  697. String datasourceId = ds.getDatasourceId();
  698. String config = ds.getConfig(); //数据库连接
  699. DBType dbType = DBType.Oracle;//********** 先定死Oracle ****************************
  700. //webservice地址
  701. Map<String, String> mapConfig = objectMapper.readValue(config, Map.class);
  702. if (mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) {
  703. String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
  704. //获取数据集映射
  705. List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
  706. JSONArray datasetList = new JSONArray(datasetString);
  707. if (datasetList != null && datasetList.length() > 0) {
  708. String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
  709. String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
  710. //获取数据集字段映射结构
  711. List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
  712. JSONArray colList = new JSONArray(colString);
  713. if (colList != null && colList.length() > 0) {
  714. //拼接查询sql
  715. String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
  716. for (int i = 0; i < colList.length(); i++) {
  717. JSONObject col = colList.getJSONObject(i);
  718. String adapterMetadataCode = col.optString("adapterMetadataCode");
  719. if (adapterMetadataCode.length() > 0) {
  720. strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
  721. }
  722. }
  723. strSql += " from " + adapterTableName;
  724. String strWhere = " where 1=1";
  725. //采集范围
  726. if (condition != null && condition.length() > 0) {
  727. strWhere += getConditionSql(dbType, condition);
  728. }
  729. //增量采集
  730. String maxKey = "0";
  731. String keyValue = ds.getJobDatasetKeyvalue();
  732. if (key != null && key.length() > 0) {
  733. maxKey = key;
  734. if (keytype.toUpperCase().equals("DATE")) //时间类型
  735. {
  736. Date keyDate = new Date();
  737. if (keyvalue != null && keyvalue.length() > 0) {
  738. //字符串转时间
  739. keyDate = DateConvert.toDate(keyvalue);
  740. //根据数据库类型获取时间sql
  741. strWhere += " and " + key + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
  742. strWhere += " order by " + key;
  743. }
  744. } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
  745. {
  746. maxKey = getToNumberSqlByDBType(dbType, key);
  747. if (keyvalue != null && keyvalue.length() > 0) {
  748. strWhere += " and " + maxKey + ">'" + keyvalue + "'";
  749. strWhere += " order by " + maxKey;
  750. }
  751. } else {
  752. if (keyvalue != null && keyvalue.length() > 0) {
  753. strWhere += " and " + key + ">'" + keyvalue + "'";
  754. strWhere += " order by " + key;
  755. }
  756. }
  757. }
  758. strSql += strWhere;
  759. //总条数和最大值查询
  760. String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
  761. String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
  762. //webservice获取数据总条数
  763. String strCount = "";//WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlCount});
  764. List<JSONObject> dataCount = getListFromXml(strCount);
  765. if (dataCount != null && dataCount.size() > 0) {
  766. Integer count = Integer.parseInt(dataCount.get(0).getString("COUNT"));
  767. if (count == 0) //0条记录,无需采集
  768. {
  769. message = "0条记录,无需采集。";
  770. } else {
  771. //webservice获取最大值
  772. String strMax = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlMax});
  773. List<JSONObject> dataMax = getListFromXml(strCount);
  774. int successCount = 0;
  775. String maxKeyvalue = dataMax.get(0).getString("MAX_KEYVALUE");
  776. //修改最大值
  777. if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
  778. datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
  779. logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
  780. }
  781. int countPage = 1;
  782. if (count > maxNum) //分页采集
  783. {
  784. countPage = count / maxNum + 1;
  785. }
  786. for (int i = 0; i < countPage; i++) {
  787. int rows = maxNum;
  788. if (i + 1 == countPage) {
  789. rows = count - i * maxNum;
  790. }
  791. String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
  792. RsJobLogDetail detail = new RsJobLogDetail();
  793. detail.setStartTime(new Date());
  794. detail.setJobLogId(logId);
  795. detail.setDatasourceId(datasourceId);
  796. detail.setConfig(config);
  797. detail.setStdDatasetCode(stdTableName);
  798. detail.setJobDatasetId(datasetId);
  799. detail.setJobDatasetName(ds.getJobDatasetName());
  800. detail.setJobId(ds.getJobId());
  801. detail.setJobSql(sql);
  802. detail.setJobNum(i + 1);
  803. detail.setJobDatasetRows(rows);
  804. detail.setSchemeVersion(schemeVersion);
  805. String msg = "";
  806. try {
  807. //获取分页数据
  808. String strList = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sql});
  809. List<JSONObject> list = getListFromXml(strList);
  810. if (list != null) {
  811. msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
  812. } else {
  813. msg = "查询数据为空!";
  814. }
  815. if (msg.length() > 0) {
  816. //任务日志细表异常操作
  817. detail.setJobStatus("0");
  818. detail.setJobContent(msg);
  819. logger.info(msg); //文本日志
  820. } else {
  821. detail.setJobStatus("1");
  822. detail.setJobContent("采集成功!");
  823. successCount += rows;
  824. }
  825. } catch (Exception ex) {
  826. msg = ex.getMessage();
  827. }
  828. detail.setEndTime(new Date());
  829. datacollectLogDao.saveEntity(detail);
  830. }
  831. message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
  832. }
  833. }
  834. } else {
  835. throw new Exception(jobDatasetName + "数据集字段映射为空!");
  836. }
  837. } else {
  838. throw new Exception(jobDatasetName + "数据集映射为空!");
  839. }
  840. } else {
  841. throw new Exception("非法webservice路径!");
  842. }
  843. logger.info(message);
  844. return message;
  845. }
  846. /**
  847. * 采集入库(包含blob字段处理)
  848. * @return
  849. */
  850. private String intoMongodb2(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
  851. {
  852. String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
  853. String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
  854. PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
  855. if (patientIdentity != null) {
  856. patientIdCode = patientIdentity.getPatientIDCode();
  857. eventNoCode = patientIdentity.getEventNoCode();
  858. }
  859. try{
  860. if(!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
  861. return "Mongodb索引创建失败!(表:"+stdDatasetCode+")";
  862. }
  863. if(list!=null && list.size()>0)
  864. {
  865. //TODO TOSET 判断是否是非结构化数据集
  866. if ("unstructured".equals(stdDatasetCode)){
  867. for (JSONObject jsonObject:list) {
  868. //文件内容保存到GridFS,细表内容字段保存为文件objctId
  869. Blob blob = (Blob) jsonObject.get("CONTENT");
  870. String type = (String) jsonObject.get("FILE_TYPE");
  871. String patientId= (String) jsonObject.get("patient_id");
  872. String eventNo= (String) jsonObject.get("event_no");
  873. Map<String,Object> params = new HashMap<>();
  874. params.put("patient_id",patientId);
  875. params.put("event_no",eventNo);
  876. try {
  877. ObjectId objectId = GridFSUtil.uploadFile("files", blob, type, params);
  878. jsonObject.put("CONTENT", objectId);
  879. } catch (Exception e) {
  880. e.printStackTrace();
  881. }
  882. }
  883. }
  884. //字典未转换前采集到原始库
  885. boolean b = mongoOrigin.insert(stdDatasetCode,translateDictCN(list, colList,schemeVersion));
  886. //字典转换
  887. list = translateDict(list, colList,schemeVersion);
  888. //采集到mongodb
  889. b = mongo.insert(stdDatasetCode,list);
  890. if(!b)
  891. {
  892. if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
  893. {
  894. System.out.print(mongo.errorMessage);
  895. return mongo.errorMessage;
  896. }
  897. else {
  898. return "Mongodb保存失败!(表:"+stdDatasetCode+")";
  899. }
  900. }
  901. }
  902. }
  903. catch (Exception e)
  904. {
  905. return e.getMessage();
  906. }
  907. return "";
  908. }
  909. /**
  910. * 数据库采集(包含Blob类型数据)
  911. * @param ds
  912. * @param schemeVersion
  913. * @param logId
  914. * @return
  915. * @throws Exception
  916. */
  917. private String collectBlobTable(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
  918. {
  919. String message = "";
  920. String datasetId = ds.getJobDatasetId();
  921. String jobDatasetName = ds.getJobDatasetName();
  922. String condition=ds.getJobDatasetCondition();
  923. String key=ds.getJobDatasetKey();
  924. String keytype=ds.getJobDatasetKeytype();
  925. String keyvalue=ds.getJobDatasetKeyvalue();
  926. String orgCode = ds.getOrgCode();
  927. String datasourceId = ds.getDatasourceId();
  928. String config = ds.getConfig(); //数据库连接
  929. DBHelper db = new DBHelper(datasourceId,config);
  930. DBType dbType = db.dbType;
  931. //获取数据集映射
  932. List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
  933. JSONArray datasetList = new JSONArray(datasetString);
  934. if(datasetList!=null &&datasetList.length()>0)
  935. {
  936. String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
  937. String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
  938. //获取数据集字段映射结构
  939. List colString = stdService.getDatacolByScheme(schemeVersion,datasetId);
  940. JSONArray colList = new JSONArray(colString);
  941. if(colList!=null && colList.length()>0)
  942. {
  943. //拼接查询sql
  944. String strSql = "Select '" + orgCode +"' as RSCOM_ORG_CODE";
  945. for(int i=0; i< colList.length();i++)
  946. {
  947. JSONObject col = colList.getJSONObject(i);
  948. String adapterMetadataCode = col.optString("adapterMetadataCode");
  949. if(adapterMetadataCode.length()>0)
  950. {
  951. strSql+= ","+adapterMetadataCode +" as " + col.optString("stdMetadataCode") ;
  952. }
  953. }
  954. strSql += " from " +adapterTableName;
  955. String strWhere = " where 1=1";
  956. //采集范围
  957. if(condition!=null && condition.length()>0)
  958. {
  959. strWhere += getConditionSql(dbType,condition);
  960. }
  961. //增量采集
  962. String maxKey = "0";
  963. if(key!=null && key.length()>0)
  964. {
  965. maxKey = key;
  966. if(keytype.toUpperCase().equals("DATE")) //时间类型
  967. {
  968. if(keyvalue!=null && keyvalue.length()>0) {
  969. Date keyDate = new Date();
  970. //字符串转时间
  971. keyDate = DateConvert.toDate(keyvalue);
  972. //根据数据库类型获取时间sql
  973. strWhere += " and "+ maxKey + ">'"+getDateSqlByDBType(dbType,keyDate)+"'";
  974. }
  975. }
  976. else if(keytype.toUpperCase().equals("VARCHAR")) //字符串类型
  977. {
  978. maxKey = getToNumberSqlByDBType(dbType,key);
  979. if(keyvalue!=null && keyvalue.length()>0) {
  980. strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
  981. }
  982. }
  983. else{
  984. if(keyvalue!=null && keyvalue.length()>0) {
  985. strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
  986. }
  987. }
  988. strWhere += " order by " + maxKey;
  989. }
  990. strSql += strWhere;
  991. //总条数
  992. String sqlCount = "select count(1) as COUNT from (" + strSql+")";
  993. String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
  994. JSONObject objCount = db.load(sqlCount);
  995. if(objCount==null)
  996. {
  997. if(db.errorMessage.length()>0)
  998. {
  999. throw new Exception(db.errorMessage);
  1000. }
  1001. else{
  1002. throw new Exception("查询异常:"+sqlCount);
  1003. }
  1004. }
  1005. else{
  1006. int count = objCount.getInt("COUNT");
  1007. if(count==0) //0条记录,无需采集
  1008. {
  1009. message = "0条记录,无需采集。";
  1010. }
  1011. else
  1012. {
  1013. //获取最大值
  1014. JSONObject objMax = db.load(sqlMax);
  1015. int successCount = 0;
  1016. String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
  1017. //修改最大值
  1018. if(maxKeyvalue!=null&& maxKeyvalue.length()>0)
  1019. {
  1020. datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(),maxKeyvalue);
  1021. logger.info("修改任务数据集最大值为"+maxKeyvalue+"。"); //文本日志
  1022. }
  1023. int countPage = 1;
  1024. if(count > maxNum) //分页采集
  1025. {
  1026. countPage = count/maxNum+1;
  1027. }
  1028. for(int i=0;i<countPage;i++)
  1029. {
  1030. int rows = maxNum;
  1031. if(i+1==countPage){
  1032. rows = count-i*maxNum;
  1033. }
  1034. String sql = getPageSqlByDBType(dbType,strSql,i*maxNum,rows); //获取分页sql语句
  1035. RsJobLogDetail detail = new RsJobLogDetail();
  1036. detail.setStartTime(new Date());
  1037. detail.setJobLogId(logId);
  1038. detail.setDatasourceId(datasourceId);
  1039. detail.setConfig(config);
  1040. detail.setStdDatasetCode(stdTableName);
  1041. detail.setJobDatasetId(datasetId);
  1042. detail.setJobDatasetName(ds.getJobDatasetName());
  1043. detail.setJobId(ds.getJobId());
  1044. detail.setJobSql(sql);
  1045. detail.setJobNum(i+1);
  1046. detail.setJobDatasetRows(rows);
  1047. detail.setSchemeVersion(schemeVersion);
  1048. List<JSONObject> list = db.query(sql);
  1049. String msg = "";
  1050. if(list!=null)
  1051. {
  1052. msg = intoMongodb2(list,schemeVersion,stdTableName,colList); //返回信息
  1053. }
  1054. else{
  1055. if(db.errorMessage.length()>0)
  1056. {
  1057. msg = db.errorMessage;
  1058. }
  1059. else{
  1060. msg = "查询数据为空!";
  1061. }
  1062. }
  1063. if(msg.length()>0)
  1064. {
  1065. //任务日志细表异常操作
  1066. detail.setJobStatus("0");
  1067. detail.setJobContent(msg);
  1068. }
  1069. else{
  1070. detail.setJobStatus("1");
  1071. detail.setJobContent("采集成功!");
  1072. successCount += rows;
  1073. }
  1074. detail.setEndTime(new Date());
  1075. datacollectLogDao.saveEntity(detail);
  1076. }
  1077. message = jobDatasetName + "采集成功"+successCount+"条数据,总条数"+count+"条。";
  1078. }
  1079. }
  1080. }
  1081. else
  1082. {
  1083. throw new Exception(jobDatasetName + "数据集字段映射为空!");
  1084. }
  1085. }
  1086. else{
  1087. throw new Exception(jobDatasetName + "数据集映射为空!");
  1088. }
  1089. return message;
  1090. }
  1091. }