PackageResolveJob.java 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package com.yihu.ehr.resolve.job;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.yihu.ehr.profile.ArchiveStatus;
  4. import com.yihu.ehr.profile.ProfileType;
  5. import com.yihu.ehr.profile.exception.ResolveException;
  6. import com.yihu.ehr.profile.family.ResourceCells;
  7. import com.yihu.ehr.profile.queue.RedisCollection;
  8. import com.yihu.ehr.fastdfs.FastDFSUtil;
  9. import com.yihu.ehr.lang.SpringContext;
  10. import com.yihu.ehr.model.packs.EsSimplePackage;
  11. import com.yihu.ehr.profile.exception.IllegalJsonDataException;
  12. import com.yihu.ehr.profile.exception.IllegalJsonFileException;
  13. import com.yihu.ehr.resolve.model.stage1.OriginalPackage;
  14. import com.yihu.ehr.resolve.model.stage2.ResourceBucket;
  15. import com.yihu.ehr.resolve.service.resource.stage1.ResolveService;
  16. import com.yihu.ehr.resolve.service.resource.stage2.IdentifyService;
  17. import com.yihu.ehr.resolve.service.resource.stage2.PackMillService;
  18. import com.yihu.ehr.resolve.service.resource.stage2.ResourceService;
  19. import com.yihu.ehr.resolve.log.PackResolveLogger;
  20. import com.yihu.ehr.resolve.service.resource.stage2.StatusReportService;
  21. import com.yihu.ehr.resolve.util.LocalTempPathUtil;
  22. import com.yihu.ehr.util.datetime.DateUtil;
  23. import net.lingala.zip4j.exception.ZipException;
  24. import org.apache.commons.lang3.StringUtils;
  25. import org.apache.commons.logging.Log;
  26. import org.apache.commons.logging.LogFactory;
  27. import org.quartz.*;
  28. import org.springframework.data.redis.core.RedisTemplate;
  29. import org.springframework.kafka.core.KafkaTemplate;
  30. import org.springframework.stereotype.Component;
  31. import java.io.Serializable;
  32. import java.util.Date;
  33. import java.util.HashMap;
  34. import java.util.Map;
  35. /**
  36. * 档案包解析作业。
  37. *
  38. * @author Sand
  39. * @version 1.0
  40. * @created 2016.03.28 11:30
  41. */
  42. @Component
  43. @DisallowConcurrentExecution
  44. public class PackageResolveJob implements InterruptableJob {
  45. private static final long DAY = 1000 * 60 * 60 * 24;
  46. private final Log logger = LogFactory.getLog(this.getClass());
  47. @Override
  48. public void interrupt() throws UnableToInterruptJobException {
  49. }
  50. @Override
  51. public void execute(JobExecutionContext context) {
  52. StatusReportService statusReportService = SpringContext.getService(StatusReportService.class);
  53. //该对象要采用名称的方式获取,否则:expected single matching bean but found 3: redisTemplate,sessionRedisTemplate,stringRedisTemplate
  54. RedisTemplate<String, Serializable> redisTemplate = SpringContext.getService("redisTemplate");
  55. ObjectMapper objectMapper = SpringContext.getService(ObjectMapper.class);
  56. Serializable serializable = redisTemplate.opsForList().rightPop(RedisCollection.ResolveQueue);
  57. if (null == serializable) {
  58. serializable = redisTemplate.opsForSet().pop(RedisCollection.ResolveQueueVice);
  59. }
  60. EsSimplePackage pack = null;
  61. try {
  62. if (serializable != null) {
  63. String packStr = serializable.toString();
  64. pack = objectMapper.readValue(packStr, EsSimplePackage.class);
  65. }
  66. if (pack != null) {
  67. //判断是否已经解析成功,或者正在解析(由于部署多个服务,运行的时间差可能导致多次加入队列,造成多次解析)
  68. Map<String, Object> map = statusReportService.getJsonArchiveById(pack.get_id());
  69. if(map != null && ("3".equals(map.get("archive_status")+"") || "1".equals(map.get("archive_status")+""))){
  70. return;
  71. }
  72. PackResolveLogger.info("开始入库:" + pack.get_id() + ", Timestamp:" + new Date());
  73. statusReportService.reportStatus(pack.get_id(), ArchiveStatus.Acquired, 0, "正在入库中", null);
  74. OriginalPackage originalPackage = doResolve(pack, statusReportService);
  75. //发送省平台上传消息
  76. redisTemplate.opsForList().leftPush(RedisCollection.ProvincialPlatformQueue, objectMapper.writeValueAsString(pack));
  77. //发送事件处理消息
  78. if (originalPackage.getProfileType() == ProfileType.File || originalPackage.getProfileType() == ProfileType.Link) {
  79. KafkaTemplate kafkaTemplate = SpringContext.getService(KafkaTemplate.class);
  80. kafkaTemplate.send("svr-pack-event", "resolve", objectMapper.writeValueAsString(pack));
  81. }
  82. }
  83. } catch (Exception e) {
  84. int errorType = -2;
  85. if (e instanceof ZipException) {
  86. errorType = 1;
  87. } else if (e instanceof IllegalJsonFileException) {
  88. errorType = 2;
  89. } else if (e instanceof IllegalJsonDataException) {
  90. errorType = 3;
  91. } else if (e instanceof ResolveException) {
  92. errorType = 21; //21以下为质控和解析的公共错误
  93. }
  94. if (pack != null) {
  95. if (StringUtils.isNotBlank(e.getMessage())) {
  96. statusReportService.reportStatus(pack.get_id(), ArchiveStatus.Failed, errorType, e.getMessage(), null);
  97. PackResolveLogger.error(e.getMessage(), e);
  98. } else {
  99. statusReportService.reportStatus(pack.get_id(), ArchiveStatus.Failed, errorType, "Internal server error, please see task log for detail message.", null);
  100. PackResolveLogger.error("Empty exception message, please see the following detail info.", e);
  101. }
  102. } else {
  103. PackResolveLogger.error("Empty pack cause by:" + e.getMessage());
  104. }
  105. }
  106. }
  107. private OriginalPackage doResolve(EsSimplePackage pack, StatusReportService statusReportService) throws Exception {
  108. ResolveService resolveEngine = SpringContext.getService(ResolveService.class);
  109. PackMillService packMill = SpringContext.getService(PackMillService.class);
  110. IdentifyService identifyService = SpringContext.getService(IdentifyService.class);
  111. ResourceService resourceService = SpringContext.getService(ResourceService.class);
  112. OriginalPackage originalPackage = resolveEngine.doResolve(pack, downloadTo(pack.getRemote_path()));
  113. ResourceBucket resourceBucket = packMill.grindingPackModel(originalPackage);
  114. identifyService.identify(resourceBucket, originalPackage);
  115. resourceService.save(resourceBucket, originalPackage);
  116. //回填入库状态
  117. Map<String, Object> map = new HashMap();
  118. map.put("defect", resourceBucket.getQcMetadataRecords().getRecords().isEmpty() ? 0 : 1); //是否解析异常
  119. map.put("patient_name", resourceBucket.getBasicRecord(ResourceCells.PATIENT_NAME));
  120. map.put("profile_id", resourceBucket.getId());
  121. map.put("demographic_id", resourceBucket.getBasicRecord(ResourceCells.DEMOGRAPHIC_ID));
  122. map.put("event_type", originalPackage.getEventType() == null ? -1 : originalPackage.getEventType().getType());
  123. map.put("event_no", originalPackage.getEventNo());
  124. map.put("event_date", DateUtil.toStringLong(originalPackage.getEventTime()));
  125. map.put("patient_id", originalPackage.getPatientId());
  126. map.put("dept", resourceBucket.getBasicRecord(ResourceCells.DEPT_CODE));
  127. long delay = pack.getReceive_date().getTime() - originalPackage.getEventTime().getTime();
  128. map.put("delay", delay % DAY > 0 ? delay / DAY + 1 : delay / DAY);
  129. map.put("re_upload_flg", String.valueOf(originalPackage.isReUploadFlg()));
  130. statusReportService.reportStatus(pack.get_id(), ArchiveStatus.Finished, 0, "resolve success", map);
  131. //回填解析数据
  132. pack.setRowkey(resourceBucket.getId());
  133. pack.setPatient_id(originalPackage.getPatientId());
  134. pack.setEvent_date(DateUtil.toStringLong(originalPackage.getEventTime()));
  135. pack.setEvent_no(originalPackage.getEventNo());
  136. pack.setEvent_type(originalPackage.getEventType() == null ? -1 : originalPackage.getEventType().getType());
  137. pack.setOrg_code(originalPackage.getOrgCode());
  138. pack.setOrg_name(resourceBucket.getBasicRecord(ResourceCells.ORG_NAME));
  139. pack.setOrg_area(resourceBucket.getBasicRecord(ResourceCells.ORG_AREA));
  140. pack.setPatient_name(resourceBucket.getBasicRecord(ResourceCells.PATIENT_NAME));
  141. pack.setIdcard_no(resourceBucket.getBasicRecord(ResourceCells.DEMOGRAPHIC_ID));
  142. pack.setVersion(originalPackage.getCdaVersion());
  143. return originalPackage;
  144. }
  145. private String downloadTo(String filePath) throws Exception {
  146. FastDFSUtil fastDFSUtil = SpringContext.getService(FastDFSUtil.class);
  147. String[] tokens = filePath.split(":");
  148. return fastDFSUtil.download(tokens[0], tokens[1], LocalTempPathUtil.getTempPathWithUUIDSuffix());
  149. }
  150. }