123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package com.yihu.ehr.resolve.job;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.ehr.profile.ArchiveStatus;
- import com.yihu.ehr.profile.ProfileType;
- import com.yihu.ehr.profile.exception.ResolveException;
- import com.yihu.ehr.profile.family.ResourceCells;
- import com.yihu.ehr.profile.queue.RedisCollection;
- import com.yihu.ehr.fastdfs.FastDFSUtil;
- import com.yihu.ehr.lang.SpringContext;
- import com.yihu.ehr.model.packs.EsSimplePackage;
- import com.yihu.ehr.profile.exception.IllegalJsonDataException;
- import com.yihu.ehr.profile.exception.IllegalJsonFileException;
- import com.yihu.ehr.resolve.model.stage1.OriginalPackage;
- import com.yihu.ehr.resolve.model.stage2.ResourceBucket;
- import com.yihu.ehr.resolve.service.resource.stage1.ResolveService;
- import com.yihu.ehr.resolve.service.resource.stage2.IdentifyService;
- import com.yihu.ehr.resolve.service.resource.stage2.PackMillService;
- import com.yihu.ehr.resolve.service.resource.stage2.ResourceService;
- import com.yihu.ehr.resolve.log.PackResolveLogger;
- import com.yihu.ehr.resolve.service.resource.stage2.StatusReportService;
- import com.yihu.ehr.resolve.util.LocalTempPathUtil;
- import com.yihu.ehr.util.datetime.DateUtil;
- import net.lingala.zip4j.exception.ZipException;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.quartz.*;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
- import java.io.Serializable;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- /**
- * 档案包解析作业。
- *
- * @author Sand
- * @version 1.0
- * @created 2016.03.28 11:30
- */
- @Component
- @DisallowConcurrentExecution
- public class PackageResolveJob implements InterruptableJob {
- private static final long DAY = 1000 * 60 * 60 * 24;
- private final Log logger = LogFactory.getLog(this.getClass());
- @Override
- public void interrupt() throws UnableToInterruptJobException {
- }
- @Override
- public void execute(JobExecutionContext context) {
- StatusReportService statusReportService = SpringContext.getService(StatusReportService.class);
- //该对象要采用名称的方式获取,否则:expected single matching bean but found 3: redisTemplate,sessionRedisTemplate,stringRedisTemplate
- RedisTemplate<String, Serializable> redisTemplate = SpringContext.getService("redisTemplate");
- ObjectMapper objectMapper = SpringContext.getService(ObjectMapper.class);
- Serializable serializable = redisTemplate.opsForList().rightPop(RedisCollection.ResolveQueue);
- if (null == serializable) {
- serializable = redisTemplate.opsForSet().pop(RedisCollection.ResolveQueueVice);
- }
- EsSimplePackage pack = null;
- try {
- if (serializable != null) {
- String packStr = serializable.toString();
- pack = objectMapper.readValue(packStr, EsSimplePackage.class);
- }
- if (pack != null) {
- //判断是否已经解析成功,或者正在解析(由于部署多个服务,运行的时间差可能导致多次加入队列,造成多次解析)
- Map<String, Object> map = statusReportService.getJsonArchiveById(pack.get_id());
- if(map != null && ("3".equals(map.get("archive_status")+"") || "1".equals(map.get("archive_status")+""))){
- return;
- }
- PackResolveLogger.info("开始入库:" + pack.get_id() + ", Timestamp:" + new Date());
- statusReportService.reportStatus(pack.get_id(), ArchiveStatus.Acquired, 0, "正在入库中", null);
- OriginalPackage originalPackage = doResolve(pack, statusReportService);
- //发送省平台上传消息
- redisTemplate.opsForList().leftPush(RedisCollection.ProvincialPlatformQueue, objectMapper.writeValueAsString(pack));
- //发送事件处理消息
- if (originalPackage.getProfileType() == ProfileType.File || originalPackage.getProfileType() == ProfileType.Link) {
- KafkaTemplate kafkaTemplate = SpringContext.getService(KafkaTemplate.class);
- kafkaTemplate.send("svr-pack-event", "resolve", objectMapper.writeValueAsString(pack));
- }
- }
- } catch (Exception e) {
- int errorType = -2;
- if (e instanceof ZipException) {
- errorType = 1;
- } else if (e instanceof IllegalJsonFileException) {
- errorType = 2;
- } else if (e instanceof IllegalJsonDataException) {
- errorType = 3;
- } else if (e instanceof ResolveException) {
- errorType = 21; //21以下为质控和解析的公共错误
- }
- if (pack != null) {
- if (StringUtils.isNotBlank(e.getMessage())) {
- statusReportService.reportStatus(pack.get_id(), ArchiveStatus.Failed, errorType, e.getMessage(), null);
- PackResolveLogger.error(e.getMessage(), e);
- } else {
- statusReportService.reportStatus(pack.get_id(), ArchiveStatus.Failed, errorType, "Internal server error, please see task log for detail message.", null);
- PackResolveLogger.error("Empty exception message, please see the following detail info.", e);
- }
- } else {
- PackResolveLogger.error("Empty pack cause by:" + e.getMessage());
- }
- }
- }
- private OriginalPackage doResolve(EsSimplePackage pack, StatusReportService statusReportService) throws Exception {
- ResolveService resolveEngine = SpringContext.getService(ResolveService.class);
- PackMillService packMill = SpringContext.getService(PackMillService.class);
- IdentifyService identifyService = SpringContext.getService(IdentifyService.class);
- ResourceService resourceService = SpringContext.getService(ResourceService.class);
- OriginalPackage originalPackage = resolveEngine.doResolve(pack, downloadTo(pack.getRemote_path()));
- ResourceBucket resourceBucket = packMill.grindingPackModel(originalPackage);
- identifyService.identify(resourceBucket, originalPackage);
- resourceService.save(resourceBucket, originalPackage);
- //回填入库状态
- Map<String, Object> map = new HashMap();
- map.put("defect", resourceBucket.getQcMetadataRecords().getRecords().isEmpty() ? 0 : 1); //是否解析异常
- map.put("patient_name", resourceBucket.getBasicRecord(ResourceCells.PATIENT_NAME));
- map.put("profile_id", resourceBucket.getId());
- map.put("demographic_id", resourceBucket.getBasicRecord(ResourceCells.DEMOGRAPHIC_ID));
- map.put("event_type", originalPackage.getEventType() == null ? -1 : originalPackage.getEventType().getType());
- map.put("event_no", originalPackage.getEventNo());
- map.put("event_date", DateUtil.toStringLong(originalPackage.getEventTime()));
- map.put("patient_id", originalPackage.getPatientId());
- map.put("dept", resourceBucket.getBasicRecord(ResourceCells.DEPT_CODE));
- long delay = pack.getReceive_date().getTime() - originalPackage.getEventTime().getTime();
- map.put("delay", delay % DAY > 0 ? delay / DAY + 1 : delay / DAY);
- map.put("re_upload_flg", String.valueOf(originalPackage.isReUploadFlg()));
- statusReportService.reportStatus(pack.get_id(), ArchiveStatus.Finished, 0, "resolve success", map);
- //回填解析数据
- pack.setRowkey(resourceBucket.getId());
- pack.setPatient_id(originalPackage.getPatientId());
- pack.setEvent_date(DateUtil.toStringLong(originalPackage.getEventTime()));
- pack.setEvent_no(originalPackage.getEventNo());
- pack.setEvent_type(originalPackage.getEventType() == null ? -1 : originalPackage.getEventType().getType());
- pack.setOrg_code(originalPackage.getOrgCode());
- pack.setOrg_name(resourceBucket.getBasicRecord(ResourceCells.ORG_NAME));
- pack.setOrg_area(resourceBucket.getBasicRecord(ResourceCells.ORG_AREA));
- pack.setPatient_name(resourceBucket.getBasicRecord(ResourceCells.PATIENT_NAME));
- pack.setIdcard_no(resourceBucket.getBasicRecord(ResourceCells.DEMOGRAPHIC_ID));
- pack.setVersion(originalPackage.getCdaVersion());
- return originalPackage;
- }
- private String downloadTo(String filePath) throws Exception {
- FastDFSUtil fastDFSUtil = SpringContext.getService(FastDFSUtil.class);
- String[] tokens = filePath.split(":");
- return fastDFSUtil.download(tokens[0], tokens[1], LocalTempPathUtil.getTempPathWithUUIDSuffix());
- }
- }
|