DatacollectManager.java 21 KB


  1. package com.yihu.hos.datacollect.service;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.fasterxml.jackson.databind.node.ArrayNode;
  5. import com.fasterxml.jackson.databind.node.ObjectNode;
  6. import com.yihu.hos.common.Services;
  7. import com.yihu.hos.datacollect.dao.DatacollectDao;
  8. import com.yihu.hos.datacollect.model.*;
  9. import com.yihu.hos.resource.model.RsDatasourceDataset;
  10. import com.yihu.hos.resource.service.StdService;
  11. import com.yihu.hos.standard.model.adapter.AdapterDatasetModel;
  12. import com.yihu.hos.standard.model.standard.StdDataSetModel;
  13. import com.yihu.hos.system.service.FlowManager;
  14. import com.yihu.hos.web.framework.model.ActionResult;
  15. import com.yihu.hos.web.framework.model.DataGridResult;
  16. import com.yihu.hos.web.framework.model.DictItem;
  17. import com.yihu.hos.web.framework.model.SimpleChartItem;
  18. import net.sf.json.JSONArray;
  19. import net.sf.json.JSONObject;
  20. import org.springframework.beans.BeanUtils;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.stereotype.Service;
  23. import org.springframework.transaction.annotation.Transactional;
  24. import javax.annotation.Resource;
  25. import java.util.ArrayList;
  26. import java.util.List;
  27. import java.util.Map;
  28. @Service(Services.Datacollect)
  29. public class DatacollectManager{
  30. public static final String BEAN_ID = Services.Datacollect;
  31. @Resource(name = DatacollectDao.BEAN_ID)
  32. private DatacollectDao datacollectDao;
  33. @Resource(name = StdService.BEAN_ID)
  34. private StdService stdManager;
  35. @Resource(name = FlowManager.BEAN_ID)
  36. private FlowManager flowManage;
  37. @Autowired
  38. private ObjectMapper objectMapper;
  39. // @Autowired
  40. // private QuartzManager quartzManager;
  41. /*********************** 任务管理 ****************************************/
  42. /**
  43. * 根据JobId获取Cron表达式
  44. */
  45. public String getCronByJobId(String jobId) throws Exception {
  46. return datacollectDao.getCronByJobId(jobId);
  47. }
  48. /**
  49. * 根据ID获取任务详细
  50. *
  51. * @return
  52. */
  53. public RsJobConfig getJobById(String id) throws Exception {
  54. return datacollectDao.getEntity(RsJobConfig.class, id);
  55. }
  56. /**
  57. * 获取任务列表
  58. */
  59. public DataGridResult getJobList(Map<String, Object> conditionMap, int page, int rows) throws Exception {
  60. DataGridResult re = datacollectDao.getJobList(conditionMap, page, rows);
  61. //获取任务列表
  62. List<RsJobConfig> list = re.getDetailModelList();
  63. if (list != null && list.size() > 0) {
  64. List<DtoJobConfig> dtoList = new ArrayList<>();
  65. for (RsJobConfig job : list) {
  66. DtoJobConfig dto = new DtoJobConfig();
  67. dto.setId(job.getId());
  68. dto.setJobContent(job.getJobContent());
  69. dto.setJobContentType(job.getJobContentType());
  70. dto.setJobInfo(job.getJobInfo());
  71. dto.setJobName(job.getJobName());
  72. dto.setJobNextTime(job.getJobNextTime());
  73. //dto.setJobTimeInterval(job.getJobTimeInterval());
  74. dto.setJobType(job.getJobType());
  75. dto.setSchemeId(job.getSchemeId());
  76. dto.setValid(job.getValid());
  77. //获取版本名称
  78. String schemeAndVersion = datacollectDao.getSchemeAndVersion(job.getSchemeVersion());
  79. dto.setSchemeAndVersion(schemeAndVersion);
  80. //获取关联数据集
  81. List<RsJobDataset> datasetList = datacollectDao.getJobDataset(job.getId());
  82. if (datasetList != null && datasetList.size() > 0) {
  83. String ds = "";
  84. for (RsJobDataset dataset : datasetList) {
  85. ds += dataset.getJobDatasetName() + ",";
  86. }
  87. ds = ds.substring(0, ds.length() - 1);
  88. dto.setJobDataset(ds);
  89. }
  90. // String cron = datacollectDao.getCronByJobId(job.getId());
  91. String cron = job.getJobCron();
  92. ////执行计划翻译
  93. dto.setJobPlan(translationCron(cron));
  94. dtoList.add(dto);
  95. }
  96. re.setDetailModelList(dtoList);
  97. }
  98. return re;
  99. }
  100. /**
  101. * 根据适配方案获取数据集列表
  102. */
  103. public DataGridResult getSchemeDataset(String schemeId, String schemeVersion, String jobId) throws Exception {
  104. //获取适配数据集总和
  105. List<AdapterDatasetModel> datasetString = stdManager.getDatasetByScheme(schemeVersion);
  106. String jsonlist = objectMapper.writeValueAsString(datasetString);
  107. ArrayNode jsonArray = objectMapper.readValue(jsonlist, ArrayNode.class);
  108. //获取任务数据集
  109. List<RsJobDataset> jobDataset = datacollectDao.getJobDataset(jobId);
  110. DataGridResult re = new DataGridResult();
  111. List<DtoJobDataset> list = new ArrayList<>();
  112. for(JsonNode jsonItem : jsonArray)
  113. {
  114. String datasetId= jsonItem.get("adapterDatasetId").toString();
  115. //配置完整才显示
  116. if(datasetId.length()>0&&jsonItem.get("adapterDatasetName").toString().length()>0&&jsonItem.get("adapterDatasetName").toString()!="null"&&jsonItem.get("adapterDatasetCode").toString().length()>0&&jsonItem.get("adapterDatasetCode").toString()!="null")
  117. {
  118. DtoJobDataset obj = new DtoJobDataset();
  119. obj.setJobDatasetName(jsonItem.get("adapterDatasetName").asText());
  120. obj.setJobDatasetCode(jsonItem.get("adapterDatasetCode").asText());
  121. obj.setJobDatasetId(datasetId);
  122. //是否关联任务
  123. if(jobDataset!=null&&jobDataset.size()>0)
  124. {
  125. for(RsJobDataset jd :jobDataset)
  126. {
  127. if(jd.getJobDatasetId().equals(datasetId))
  128. {
  129. obj.setId(jd.getId());
  130. obj.setJobId(jobId);
  131. obj.setJobDatasetKeyvalue(jd.getJobDatasetKeyvalue());
  132. obj.setJobDatasetKey(jd.getJobDatasetKey());
  133. obj.setJobDatasetKeytype(jd.getJobDatasetKeytype());
  134. obj.setChecked("1");
  135. obj.setJobDatasetCondition(jd.getJobDatasetCondition());
  136. break;
  137. }
  138. }
  139. }
  140. list.add(obj);
  141. }
  142. }
  143. re.setDetailModelList(list);
  144. return re;
  145. }
  146. /**
  147. * 根据适配方案获取字段列表
  148. */
  149. public DataGridResult getSchemeDatasetCol(String schemeId, String schemeVersion, String datasetId) throws Exception {
  150. //获取适配字段
  151. List datacolString = stdManager.getDatacolByScheme(schemeVersion, datasetId);
  152. JSONArray datacolList = JSONArray.fromObject(datacolString);
  153. DataGridResult re = new DataGridResult();
  154. List<DtoDatasetCol> list = new ArrayList<>();
  155. for(Object item : datacolList)
  156. {
  157. JSONObject jsonItem = JSONObject.fromObject(item);
  158. DtoDatasetCol obj = new DtoDatasetCol();
  159. obj.setCode(jsonItem.getString("adapterMetadataCode"));
  160. obj.setText(jsonItem.getString("adapterMetadataName"));
  161. obj.setType(jsonItem.getString("adapterMetadataType"));
  162. obj.setDict(jsonItem.getString("stdDictId"));
  163. list.add(obj);
  164. }
  165. re.setDetailModelList(list);
  166. return re;
  167. }
  168. /**
  169. * 根据任务Id获取相关数据集下拉数据
  170. */
  171. public DataGridResult getJobDatasetByJobId(String jobId) throws Exception {
  172. //获取任务数据集
  173. List<RsJobDataset> jobDataset = datacollectDao.getJobDataset(jobId);
  174. DataGridResult re = new DataGridResult();
  175. List<DictItem> list = new ArrayList<>();
  176. for (RsJobDataset obj : jobDataset) {
  177. DictItem item = new DictItem();
  178. item.setValue(obj.getJobDatasetName());
  179. item.setCode(obj.getJobDatasetId());
  180. list.add(item);
  181. }
  182. re.setDetailModelList(list);
  183. return re;
  184. }
  185. /**
  186. * 新增任务
  187. */
  188. @Transactional
  189. public ActionResult addJob(RsJobConfig obj, String cron, String jobDataset) throws Exception {
  190. datacollectDao.saveEntity(obj);//保存任务
  191. Integer flowId = flowManage.genCamelFile(obj.getId(),obj.getFlowTempId(),cron);
  192. if (flowId!=null){
  193. obj.setFlowId(flowId);
  194. datacollectDao.updateEntity(obj);
  195. saveJobDataset(obj.getId(), jobDataset);
  196. return new ActionResult(true, "新增任务成功!");
  197. }else {
  198. datacollectDao.deleteEntity(obj);
  199. return new ActionResult(false, "新增任务失败-关联流程环节失败!");
  200. }
  201. //quartz新增任务
  202. // quartzManager.addJob(obj.getId(), obj.getJobContentType(), obj.getJobContent(), obj.getJobNextTime(), cron);
  203. }
  204. /**
  205. * 修改任务
  206. */
  207. @Transactional
  208. public ActionResult updateJob(RsJobConfig obj, String cron, String jobDataset) throws Exception {
  209. Integer flowId = flowManage.updateCamelFile(obj.getId(),obj.getFlowTempId(), obj.getFlowId(), cron);
  210. if (flowId!=null){
  211. datacollectDao.updateEntity(obj);
  212. saveJobDataset(obj.getId(), jobDataset);
  213. //quartz修改cron表达式
  214. // quartzManager.modifyJob(obj.getId(), obj.getJobContentType(), obj.getJobContent(), obj.getJobNextTime(), cron);
  215. return new ActionResult(true, "修改成功!");
  216. }else {
  217. return new ActionResult(false, "修改失败,关联流程失败!");
  218. }
  219. }
  220. /**
  221. * 修改任务
  222. */
  223. @Transactional
  224. public ActionResult updateJob(RsJobConfig obj) throws Exception {
  225. datacollectDao.updateEntity(obj);
  226. return new ActionResult(true, "修改成功!");
  227. }
  228. /**
  229. * 修改任务状态
  230. */
  231. @Transactional
  232. public ActionResult validJob(String jobId, String valid) throws Exception {
  233. datacollectDao.validJob(jobId, valid);
  234. //暂停Quartz任务
  235. if (valid.equals("0")) {
  236. // quartzManager.pauseJob(jobId);
  237. } else { //恢复Quartz任务
  238. // quartzManager.resumeJob(jobId);
  239. }
  240. return new ActionResult(true, "状态修改成功!");
  241. }
  242. /**
  243. * 删除任务
  244. */
  245. @Transactional
  246. public ActionResult deleteJob(String jobId) throws Exception {
  247. //清空任务关联数据集
  248. datacollectDao.deleteJobDatasetByJobId(jobId);
  249. //删除相关流程记录
  250. RsJobConfig entity = datacollectDao.getEntity(RsJobConfig.class, jobId);
  251. if (entity.getFlowId()!=null){
  252. flowManage.deleteFlow(entity.getFlowId());
  253. }
  254. datacollectDao.deleteEntity(entity);
  255. //删除Quartz任务
  256. // quartzManager.removeJob(jobId);
  257. return new ActionResult(true, "删除成功!");
  258. }
  259. /**
  260. * 数据集数据源管理列表(包含全部数据集)
  261. */
  262. public DataGridResult getDatasetSource(String stdVersion) throws Exception {
  263. //获取版本下全部数据集
  264. List<StdDataSetModel> stdDataSetModelList = stdManager.getDatasetByVersion(stdVersion);
  265. String jsonlist = objectMapper.writeValueAsString(stdDataSetModelList);
  266. ArrayNode datasetList = objectMapper.readValue(jsonlist, ArrayNode.class);
  267. //获取已配数据集
  268. List<RsDatasourceDataset> jobDataset = datacollectDao.getDatasourceDataset(stdVersion);
  269. DataGridResult re = new DataGridResult();
  270. List<JsonNode> list = new ArrayList<>();
  271. for (JsonNode obj : datasetList) {
  272. ObjectNode dd = objectMapper.createObjectNode();
  273. dd.put("datasetId", obj.get("id").asText());
  274. dd.put("datasetCode", obj.get("code").asText());
  275. dd.put("datasetName", obj.get("name").asText());
  276. dd.put("stdVersion", stdVersion);
  277. if (jobDataset != null && jobDataset.size() > 0) {
  278. for (RsDatasourceDataset rdd : jobDataset) {
  279. if (rdd.getDatasetId().equals(obj.get("id").asText())) {
  280. dd.put("orgId", rdd.getOrgId());
  281. dd.put("datasourceId", rdd.getDatasourceId());
  282. dd.put("id", rdd.getId());
  283. break;
  284. }
  285. }
  286. }
  287. list.add(dd);
  288. }
  289. re.setDetailModelList(list);
  290. return re;
  291. }
  292. /**
  293. * 保存数据集数据源配置
  294. */
  295. @Transactional
  296. public ActionResult saveDatasetSource(String json) throws Exception {
  297. //TODO: Use jackson
  298. // JSONArray jsonList = JSONArray.fromObject(json);
  299. //
  300. //
  301. // for (Object item : jsonList) {
  302. // JSONObject obj = JSONObject.fromObject(item);
  303. // if (obj.containsKey("id") && obj.getString("id").length() > 0) {
  304. // String id = obj.getString("id");
  305. // //修改
  306. // RsDatasourceDataset dd = (RsDatasourceDataset) JSONObject.toBean(obj, RsDatasourceDataset.class);
  307. // datacollectDao.updateEntity(dd);
  308. //// if(obj.containsKey("datasourceId") && obj.getString("datasourceId").length()>0)
  309. //// {
  310. //// RsDatasourceDataset dd = (RsDatasourceDataset)JSONObject.toBean(obj,RsDatasourceDataset.class);
  311. //// datacollectDao.updateEntity(dd);
  312. //// }
  313. //// //删除
  314. //// else{
  315. //// datacollectDao.deleteEntity(RsDatasourceDataset.class,id);
  316. //// }
  317. // } else {
  318. // //新增
  319. // RsDatasourceDataset dd = (RsDatasourceDataset) JSONObject.toBean(obj, RsDatasourceDataset.class);
  320. // datacollectDao.saveEntity(dd);
  321. // }
  322. // }
  323. return new ActionResult(true, "保存成功!");
  324. }
  325. /************************* 数据集数据源管理 ***************************************************/
  326. /**
  327. * 获取任务详细日志列表
  328. */
  329. public DataGridResult getJobLogDetail(Map<String, Object> conditionMap, int page, int rows) throws Exception {
  330. return datacollectDao.getJobLogDetail(conditionMap, page, rows);
  331. }
  332. /**
  333. * 获取任务日志列表
  334. */
  335. public DataGridResult getJobLog(Map<String, Object> conditionMap, int page, int rows) throws Exception {
  336. DataGridResult re = datacollectDao.getJobLog(conditionMap, page, rows);
  337. List<RsJobLog> logList = re.getDetailModelList();
  338. List<DtoJobLog> list = new ArrayList<>();
  339. for (RsJobLog log : logList) {
  340. DtoJobLog dto = new DtoJobLog();
  341. BeanUtils.copyProperties(log, dto);
  342. List<Map<String, Object>> maps = datacollectDao.getJobLogCount(log.getId());
  343. if (maps != null && maps.size() > 0) {
  344. int count = Integer.parseInt(String.valueOf(maps.get(0).get("count")));
  345. int success = Integer.parseInt(String.valueOf(maps.get(0).get("success")));
  346. int repeat_num = Integer.parseInt(String.valueOf(maps.get(0).get("repeat_num")));
  347. dto.setCount(count);
  348. dto.setSuccess(success);
  349. dto.setRepeatNum(repeat_num);
  350. } else {
  351. dto.setCount(0);
  352. dto.setSuccess(0);
  353. dto.setRepeatNum(0);
  354. }
  355. list.add(dto);
  356. }
  357. re.setDetailModelList(list);
  358. return re;
  359. }
  360. /********************** 任务日志管理 *******************************************/
  361. /**
  362. * 任务详细根据数据集分组
  363. *
  364. * @return
  365. */
  366. public DataGridResult getJobLogDataset(String logId) throws Exception {
  367. DataGridResult re = new DataGridResult();
  368. List<SimpleChartItem> maps = datacollectDao.getJobLogDataset(logId);
  369. re.setDetailModelList(maps);
  370. return re;
  371. }
  372. /**
  373. * 翻译Cron表达式
  374. *
  375. * @return
  376. */
  377. private String translationCron(String cron) {
  378. try {
  379. if (cron != null && cron.length() > 0) {
  380. String re = "";
  381. String[] items = cron.split(" ");
  382. if (!items[5].equals("?")) //周
  383. {
  384. String[] weekDay = items[5].split(",");
  385. for (String day : weekDay) {
  386. String WeekDay = day;
  387. if (day.equals("1")) {
  388. WeekDay = "日";
  389. } else if (day.equals("2")) {
  390. WeekDay = "一";
  391. } else if (day.equals("3")) {
  392. WeekDay = "二";
  393. } else if (day.equals("4")) {
  394. WeekDay = "三";
  395. } else if (day.equals("5")) {
  396. WeekDay = "四";
  397. } else if (day.equals("6")) {
  398. WeekDay = "五";
  399. } else if (day.equals("7")) {
  400. WeekDay = "六";
  401. }
  402. re += "星期" + WeekDay + ",";
  403. }
  404. re = re.substring(0, re.length() - 1);
  405. } else {
  406. if (!items[3].equals("*")) {
  407. String v = items[3];
  408. if (v.indexOf('/') > 0) //天
  409. {
  410. String[] varry = v.split("/");
  411. re = "每隔" + varry[1] + "天";
  412. } else {//月
  413. if (v.equals("1")) {
  414. re = "每月第一天";
  415. } else if (v.equals("L")) {
  416. re = "每月最后一天";
  417. } else {
  418. re = "每月第" + v + "天";
  419. }
  420. }
  421. } else {
  422. String v1 = items[1];
  423. String v2 = items[2];
  424. if (v1.indexOf('/') > 0) //分
  425. {
  426. String[] varry = v1.split("/");
  427. re = "每隔" + varry[1] + "分";
  428. } else { //时
  429. String[] varry = v2.split("/");
  430. re = "每隔" + varry[1] + "时";
  431. }
  432. }
  433. }
  434. return re + "执行";
  435. }
  436. return cron;
  437. } catch (Exception ex) {
  438. return cron;
  439. }
  440. }
  441. /**
  442. * 保存任务关联数据集
  443. */
  444. private void saveJobDataset(String jobId, String jobDataset) throws Exception {
  445. if(jobDataset!=null&&jobDataset.length()>0) {
  446. //清空任务关联数据集
  447. datacollectDao.deleteJobDatasetByJobId(jobId);
  448. ArrayNode array = objectMapper.readValue(jobDataset,ArrayNode.class);
  449. if (array != null && array.size() > 0) {
  450. for (JsonNode obj : array) {
  451. RsJobDataset rs = new RsJobDataset();
  452. if(obj.get("jobDatasetCondition")!=null&&obj.get("jobDatasetCondition").asText()!="null")
  453. {
  454. rs.setJobDatasetCondition(obj.get("jobDatasetCondition").asText());
  455. }
  456. if(obj.get("jobDatasetId")!=null&&obj.get("jobDatasetId").asText()!="null") {
  457. rs.setJobDatasetId(obj.get("jobDatasetId").asText());
  458. }
  459. if(obj.get("jobDatasetKey")!=null&&obj.get("jobDatasetKey").asText()!="null") {
  460. rs.setJobDatasetKey(obj.get("jobDatasetKey").asText());
  461. }
  462. if(obj.get("jobDatasetKeytype")!=null&&obj.get("jobDatasetKeytype").asText()!="null") {
  463. rs.setJobDatasetKeytype(obj.get("jobDatasetKeytype").asText());
  464. }
  465. if(obj.get("jobDatasetKeyvalue")!=null&&obj.get("jobDatasetKeyvalue").asText()!="null") {
  466. rs.setJobDatasetKeyvalue(obj.get("jobDatasetKeyvalue").asText());
  467. }
  468. if(obj.get("jobDatasetName")!=null&&obj.get("jobDatasetName").asText()!="null") {
  469. rs.setJobDatasetName(obj.get("jobDatasetName").asText());
  470. }
  471. rs.setJobId(jobId);
  472. datacollectDao.saveEntity(rs);
  473. }
  474. }
  475. }
  476. else{
  477. return;
  478. }
  479. }
  480. }