|
- package com.yihu.hos.crawler.service;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.fasterxml.jackson.databind.node.ArrayNode;
- import com.fasterxml.jackson.databind.node.ObjectNode;
- import com.yihu.hos.core.datatype.StringUtil;
- import com.yihu.hos.crawler.dao.CrawlerDatasetDao;
- import com.yihu.hos.crawler.dao.CrawlerFlowDao;
- import com.yihu.hos.crawler.dao.CrawlerFlowHeadDao;
- import com.yihu.hos.crawler.model.flow.CrawlerDataSetModel;
- import com.yihu.hos.crawler.model.flow.CrawlerFlowHeadModel;
- import com.yihu.hos.crawler.model.flow.CrawlerFlowModel;
- import com.yihu.hos.crawler.model.flow.resultModel.*;
- import com.yihu.hos.datacollect.model.DtoJobDataset;
- import com.yihu.hos.standard.model.adapter.AdapterDatasetModel;
- import com.yihu.hos.standard.model.adapter.AdapterMetadataModel;
- import com.yihu.hos.standard.model.adapter.AdapterSchemeVersionModel;
- import com.yihu.hos.standard.model.adapter.resultModel.AdapterSchemeResultModel;
- import com.yihu.hos.standard.model.adapter.resultModel.AdapterSchemeVersionResultDetailModel;
- import com.yihu.hos.standard.service.adapter.AdapterDatasetService;
- import com.yihu.hos.standard.service.adapter.AdapterMetadataService;
- import com.yihu.hos.standard.service.adapter.AdapterSchemeService;
- import com.yihu.hos.standard.service.adapter.AdapterSchemeVersionService;
- import com.yihu.hos.standard.service.bo.AdapterVersion;
- import com.yihu.hos.web.framework.constrant.SqlConstants;
- import com.yihu.hos.web.framework.model.ActionResult;
- import com.yihu.hos.web.framework.model.DetailModelResult;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.util.*;
- @Transactional
- @Service("CrawlerService")
- public class CrawlerService {
- public static final String BEAN_ID = "CrawlerService";
- @Resource(name = AdapterSchemeVersionService.BEAN_ID)
- private AdapterSchemeVersionService adapterSchemeVersionService;
- @Resource(name = AdapterDatasetService.BEAN_ID)
- private AdapterDatasetService adapterDatasetService;
- @Resource(name = CrawlerDatasetDao.BEAN_ID)
- private CrawlerDatasetDao crawlerDatasetDao;
- @Resource(name = CrawlerFlowDao.BEAN_ID)
- private CrawlerFlowDao crawlerFlowDao;
- @Resource(name = CrawlerFlowHeadDao.BEAN_ID)
- private CrawlerFlowHeadDao crawlerFlowHeadDao;
- @Resource(name = AdapterMetadataService.BEAN_ID)
- private AdapterMetadataService adapterMetadataService;
- private static Map<Integer, List<FlowLines>> lineCache = new HashMap<>();
- @Resource(name = AdapterSchemeService.BEAN_ID)
- private AdapterSchemeService adapterSchemeService;
- @Autowired
- private ObjectMapper objectMapper;
- public static Map<Integer, List<FlowLines>> getLineCache() {
- return lineCache;
- }
- /**
- * 保存编排映射关系
- *
- * @param version 适配方案版本
- * @param json 映射数据
- * @return
- * @throws Exception
- */
- public ActionResult saveDataSetRelation(String version, String json) throws Exception {
- ObjectNode root = objectMapper.readValue(json,ObjectNode.class);
- ArrayNode jsonList = (ArrayNode) root.get("lines");
- ArrayNode entrances = (ArrayNode) root.get("entrances");
- AdapterSchemeVersionModel versionModel = (AdapterSchemeVersionModel) adapterSchemeVersionService.get(Integer.valueOf(version));
- if (jsonList != null && jsonList.size() > 0) {
- // 删除旧关联关系
- crawlerFlowDao.deleteCrawlerFlowList(versionModel.getId());
- for (JsonNode obj : jsonList) {
- String from = obj.get("from").toString();
- String to = obj.get("to").toString();
- String fromPort = obj.get("fromPort").toString();
- String toPort = obj.get("toPort").toString();
- // 保存编排关系
- CrawlerFlowModel crawlerFlow = new CrawlerFlowModel();
- crawlerFlow.setDatasetCode(to);
- crawlerFlow.setInputDatasetCode(from);
- crawlerFlow.setMetadataCode(toPort);
- crawlerFlow.setInputMetadataCode(fromPort);
- crawlerFlow.setSchemeVersionId(versionModel.getId());
- crawlerFlowDao.saveEntity(crawlerFlow);
- }
- // 删除编排头部信息
- crawlerFlowHeadDao.deleteCrawlerFlowHeadList(versionModel.getId());
- for (JsonNode obj : entrances) {
- String dataSet = obj.get("dataSet").toString();
- String meta = obj.get("meta").toString();
- // 保存入口数据集
- CrawlerFlowHeadModel headModel = new CrawlerFlowHeadModel();
- headModel.setSchemeVersionId(versionModel.getId());
- headModel.setDatasetCode(dataSet);
- headModel.setMetadataCode(meta);
- crawlerFlowHeadDao.saveEntity(headModel);
- }
- }
- return new ActionResult(true, "保存成功!");
- }
- /**
- * 获取任务编排数据集列表
- *
- * @param schemeVersionId
- * @return
- * @throws Exception
- */
- public DetailModelResult getSchemeDataset(Integer schemeVersionId, String datasetName) throws Exception {
- AdapterSchemeVersionModel versionModel = (AdapterSchemeVersionModel) adapterSchemeVersionService.get(schemeVersionId);
- //获取适配数据集总和
- Map<String, Object> map = new HashMap<String, Object>();
- String condition = null;
- if (datasetName != null && !"".equals(datasetName)) {
- map.put("name", datasetName);
- condition = objectMapper.writeValueAsString(condition);
- }
- List<AdapterDatasetModel> adapterDatasetModelList = adapterDatasetService.getDatasetList(AdapterDatasetModel.class, versionModel.getVersion(), condition, null, null, null);
- // 筛选掉 未适配数据
- List<AdapterDatasetModel> nAdapterDataSetModelList = new ArrayList<>();
- for (AdapterDatasetModel datasetModel : adapterDatasetModelList) {
- if (datasetModel.getAdapterDatasetId() != null && datasetModel.getAdapterDatasetName() != null && datasetModel.getAdapterDatasetCode() != null) {
- nAdapterDataSetModelList.add(datasetModel);
- }
- }
- //获取编排数据集
- List<CrawlerDataSetModel> crawlerDataset = crawlerDatasetDao.getCrawlerDatasetList(versionModel.getId());
- DetailModelResult re = new DetailModelResult();
- List<CrawlerDatasetResultDetailModel> list = new ArrayList<>();
- for (AdapterDatasetModel datasetModel : nAdapterDataSetModelList) {
- if (!StringUtil.isStrEmpty(datasetModel.getAdapterDatasetCode())) {
- List<AdapterMetadataModel> metadatas = adapterMetadataService.getAdapterMetadataByDataset(versionModel.getVersion(), datasetModel.getStdDatasetId());
- if (metadatas != null && metadatas.size() > 0) {
- CrawlerDatasetResultDetailModel obj = new CrawlerDatasetResultDetailModel();
- obj.setSchemeVersionId(schemeVersionId);
- obj.setDatasetId(datasetModel.getStdDatasetId());
- obj.setDatasetCode(datasetModel.getStdDatasetCode());
- obj.setDatasetName(datasetModel.getStdDatasetName());
- obj.setSchemeId(datasetModel.getSchemeId());
- if (crawlerDataset != null && crawlerDataset.size() > 0) {
- for (CrawlerDataSetModel cDataSet : crawlerDataset) {
- if (cDataSet.getDatasetId().equals(datasetModel.getStdDatasetId())) {
- obj.setSchemeVersionId(cDataSet.getSchemeVersionId());
- obj.setChecked("1");
- break;
- }
- }
- }
- list.add(obj);
- }
- }
- }
- re.setDetailModelList(list);
- return re;
- }
- /**
- * 保存任务编排数据
- *
- * @param json
- * @param rows
- * @param page @return
- * @throws Exception
- */
- public void saveJobData(String json, Integer rows, Integer page) throws Exception {
- ArrayNode jsonList = objectMapper.readValue(json,ArrayNode.class);
- //清空当页数据
- deleteCurrentPage(rows, page);
- for (JsonNode obj : jsonList) {
- if (obj.has("schemeId") && obj.has("versionId")) {
- String schemeId = obj.get("schemeId").toString();
- String versionId = obj.get("versionId").toString();
- AdapterSchemeVersionModel versionModel = (AdapterSchemeVersionModel) adapterSchemeVersionService.get(Integer.valueOf(versionId));
- if (versionModel != null) {
- AdapterVersion adapterVersion = new AdapterVersion(versionModel.getVersion());
- //删除已存在的数据集
- crawlerDatasetDao.deleteCrawlerDatasetList(versionModel.getId());
- List<AdapterDatasetModel> adapterDatasetModelList;
- //根据id字符串获取编排数据集
- if (obj.has("dataSets")) {
- List<Integer> newDatasetIdList = new ArrayList<>();
- String dataSetStr = obj.get("dataSets").toString();
- if (StringUtils.isNotBlank(dataSetStr)) {
- String[] IdList = dataSetStr.split(",");
- for (String aIdList : IdList) {
- if (!Objects.equals(aIdList, "")) {
- Integer DaSetId = Integer.valueOf(aIdList);
- newDatasetIdList.add(DaSetId);
- }
- }
- }
- adapterDatasetModelList = adapterDatasetService.getListByAdapterDatasetIdList(adapterVersion, newDatasetIdList);
- for (AdapterDatasetModel model : adapterDatasetModelList) {
- CrawlerDataSetModel dataSetModel = new CrawlerDataSetModel();
- dataSetModel.setSchemeId(Integer.valueOf(schemeId));
- dataSetModel.setSchemeVersionId(versionModel.getId());
- dataSetModel.setDatasetId(model.getStdDatasetId());
- dataSetModel.setDatasetCode(model.getStdDatasetCode());
- dataSetModel.setDatasetName(model.getStdDatasetName());
- crawlerDatasetDao.saveEntity(dataSetModel);
- }
- }
- //如果保存传入编排映射关系,进行保存操作
- if (obj.has("relation") && !Objects.equals(obj.get("relation").toString(), "")) {
- saveDataSetRelation(versionId, obj.get("relation").toString());
- }
- }
- }
- }
- }
- public List<FlowEntrance> getFlowEntrances(Integer schemeVersionId) {
- List<FlowEntrance> entrances = new ArrayList<>();
- List<CrawlerFlowHeadModel> modelList = crawlerFlowHeadDao.getCrawlerFlowHeadList(schemeVersionId);
- for (CrawlerFlowHeadModel headModel : modelList) {
- FlowEntrance entrance = new FlowEntrance();
- entrance.setDataSet(headModel.getDatasetCode());
- entrance.setMeta(headModel.getMetadataCode());
- entrances.add(entrance);
- }
- return entrances;
- }
- public List<FlowLines> getFlowLines(Integer schemeVersionId) {
- List<FlowLines> lines = new ArrayList<>();
- List<CrawlerFlowModel> modelList = crawlerFlowDao.getCrawlerFlowList(schemeVersionId);
- for (CrawlerFlowModel model : modelList) {
- FlowLines line = new FlowLines();
- line.setFrom(model.getInputDatasetCode());
- line.setFromPort(model.getInputMetadataCode());
- line.setTo(model.getDatasetCode());
- line.setToPort(model.getMetadataCode());
- lines.add(line);
- }
- return lines;
- }
- /**
- * 删除编排数据
- *
- * @param version
- */
- @Transactional
- public String deleteJobData(String version) {
- try {
- AdapterSchemeVersionModel versionModel = (AdapterSchemeVersionModel) adapterSchemeVersionService.get(Integer.valueOf(version));
- if (versionModel == null || versionModel.getId() == null) {
- return "删除失败";
- }
- //删除对应表记录
- crawlerDatasetDao.deleteCrawlerDatasetList(versionModel.getId());
- crawlerFlowHeadDao.deleteCrawlerFlowHeadList(versionModel.getId());
- crawlerFlowDao.deleteCrawlerFlowList(versionModel.getId());
- } catch (Exception e) {
- e.printStackTrace();
- return "删除失败";
- }
- return SqlConstants.EMPTY;
- }
- /**
- * 数据集列表
- *
- * @param limit rows
- * @param offset page
- * @return
- */
- public DetailModelResult getDataSetResult(Integer limit, Integer offset) {
- try {
- StringBuffer stringBuffer = new StringBuffer();
- String sql = "SELECT 1 as status, a.scheme_id, a.scheme_version_id, GROUP_CONCAT(a.dataset_id SEPARATOR ',') AS datasetId, GROUP_CONCAT(a.dataset_name SEPARATOR ',') AS datasetName" +
- " FROM crawler_dataset a " +
- " GROUP BY a.scheme_id, a.scheme_version_id ";
- stringBuffer.append(sql);
- if (limit != null && offset != null) {
- if (limit > 0 && offset > 0) {
- stringBuffer.append(" LIMIT " + (offset - 1) * limit + "," + limit);
- }
- }
- stringBuffer.append(" ;");
- Integer total = crawlerDatasetDao.getTotalRows();
- List<Map<String, Object>> list = crawlerDatasetDao.queryListBySql(stringBuffer.toString());
- DetailModelResult detailModelResult = DetailModelResult.success("获取数据集成功");
- detailModelResult.setDetailModelList(list);
- detailModelResult.setTotalCount(total);
- return detailModelResult;
- } catch (Exception e) {
- e.printStackTrace();
- return DetailModelResult.error("获取数据集失败");
- }
- }
- /**
- * 获取编排已选择的适配数据集
- *
- * @param schemeVersionId
- * @param datasetIdStr
- * @return
- * @throws Exception
- */
- public List<MappingDataset> getSchemeDatasetByChecked(Integer schemeVersionId, String datasetIdStr) throws Exception {
- AdapterSchemeVersionModel versionModel = (AdapterSchemeVersionModel) adapterSchemeVersionService.get(schemeVersionId);
- //获取适配数据集总和
- AdapterVersion adapterVersion = new AdapterVersion(versionModel.getVersion());
- List<AdapterDatasetModel> adapterDatasetModelList = new ArrayList<>();
- if (datasetIdStr != null && !"".equals(datasetIdStr)) {
- String[] datasetIdList = datasetIdStr.split(",");
- List<Integer> newDatasetIdList = new ArrayList<>();
- for (String datasetId : datasetIdList) {
- if (!StringUtil.isStrEmpty(datasetId)) {
- Integer newDatasetId = Integer.parseInt(datasetId);
- newDatasetIdList.add(newDatasetId);
- }
- }
- adapterDatasetModelList = adapterDatasetService.getListByAdapterDatasetIdList(adapterVersion, newDatasetIdList);
- } else {
- adapterDatasetModelList = adapterDatasetService.getDatasetList(AdapterDatasetModel.class, versionModel.getVersion(), null, null, null, null);
- }
- //获取编排数据集
- List<CrawlerDataSetModel> crawlerDataset = crawlerDatasetDao.getCrawlerDatasetList(versionModel.getId());
- List<MappingDataset> list = new ArrayList<>();
- for (AdapterDatasetModel datasetModel : adapterDatasetModelList) {
- // if (crawlerDataset != null && crawlerDataset.size() > 0) {
- MappingDataset obj = new MappingDataset();
- // for (CrawlerDataSetModel cDataSet : crawlerDataset) {
- if (!StringUtil.isStrEmpty(datasetModel.getAdapterDatasetCode())) {
- List<MappingMetadata> metadatas = getMappingMetaDatasByDataset(versionModel.getVersion(), datasetModel.getStdDatasetId());
- obj.setId(datasetModel.getStdDatasetId());
- obj.setCode(datasetModel.getStdDatasetCode());
- obj.setName(datasetModel.getStdDatasetName());
- obj.setData(metadatas);
- // break;
- }
- // }
- list.add(obj);
- // }
- }
- return list;
- }
- /**
- * 返回前端映射数据元信息
- *
- * @param adapterVersion 适配版本号
- * @param dataSetId 适配数据集ID
- * @return
- */
- public List<MappingMetadata> getMappingMetaDatasByDataset(String adapterVersion, Integer dataSetId) {
- List<AdapterMetadataModel> adapterMetadataModels = adapterMetadataService.getAdapterMetadataByDataset(adapterVersion, dataSetId);
- AdapterDatasetModel adapterDatasetModel = adapterMetadataService.getAdapterDataset(adapterVersion, dataSetId);
- List<MappingMetadata> resultList = new ArrayList<>();
- if (adapterMetadataModels != null && adapterMetadataModels.size() > 0) {
- for (AdapterMetadataModel metadataModel : adapterMetadataModels) {
- if (!StringUtil.isStrEmpty(metadataModel.getAdapterMetadataCode())) {
- MappingMetadata metadata = new MappingMetadata();
- metadata.setId(metadataModel.getAdapterMetadataId());
- metadata.setCode(adapterDatasetModel.getAdapterDatasetCode() + "-" + metadataModel.getAdapterMetadataCode());
- metadata.setName(metadataModel.getAdapterMetadataName());
- resultList.add(metadata);
- }
- }
- }
- return resultList;
- }
- /**
- * 获取适配方案映射数据
- *
- * @param schemeVersionId
- * @param datasetIdStr
- * @return
- */
- public String getRelations(Integer schemeVersionId, String datasetIdStr, String lineStr) {
- ObjectNode jsonObject = objectMapper.createObjectNode();
- try {
- List<MappingDataset> datasets = getSchemeDatasetByChecked(schemeVersionId, datasetIdStr);
- List<FlowLines> lines = getFlowLines(schemeVersionId);
- if (StringUtil.isEmpty(lineStr)) {
- lines = getFlowLines(schemeVersionId);
- } else {
- lines = lineCache.get(schemeVersionId);
- }
- jsonObject.set("tables", objectMapper.valueToTree(datasets));
- jsonObject.set("rels", objectMapper.valueToTree(lines));
- } catch (Exception e) {
- e.printStackTrace();
- }
- return jsonObject.toString();
- }
- public void setLinesCache(Integer schemeVersionId, String lines) throws IOException {
- ObjectNode rootNode = objectMapper.readValue(lines, ObjectNode.class);
- String lineJson = rootNode.get("lines").toString();
- List<FlowLines> line = objectMapper.readValue(lineJson, List.class);
- lineCache.put(schemeVersionId, line);
- }
- public void deleteCurrentPage(Integer rows, Integer page) {
- DetailModelResult currentResut = getDataSetResult(rows, page);
- List<Map<String, Object>> list = currentResut.getDetailModelList();
- for (Map<String, Object> map : list) {
- String version = String.valueOf(map.get("scheme_version_id"));
- deleteJobData(version);
- }
- }
- public DetailModelResult getDataSetSavedResult(Integer version) {
- try {
- DetailModelResult checkedSchemeLs = getDataSetResult(null, null);
- List<Map<String, Object>> list = checkedSchemeLs.getDetailModelList();
- for (Map<String, Object> objectMap : list) {
- String versionID = objectMap.get("scheme_version_id").toString();
- if (versionID.equals(version.toString())) {
- String datasetIdStr = objectMap.get("dataSetId").toString();
- List<MappingDataset> datasetList = getSchemeDatasetByChecked(version, datasetIdStr);
- List<DtoJobDataset> rsJobDatasetList = new ArrayList<>();
- for (MappingDataset dataset : datasetList) {
- DtoJobDataset rsJobDataset = new DtoJobDataset();
- rsJobDataset.setJobDatasetId(dataset.getId().toString());
- rsJobDataset.setJobDatasetName(dataset.getName());
- rsJobDataset.setJobDatasetCode(dataset.getCode());
- rsJobDatasetList.add(rsJobDataset);
- }
- DetailModelResult result = new DetailModelResult();
- result.setDetailModelList(rsJobDatasetList);
- return result;
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- return DetailModelResult.error("获取已存任务编排数据集错误");
- }
- return DetailModelResult.error("获取已存任务编排数据集错误");
- }
- public DetailModelResult getSchemeSavedResult() {
- DetailModelResult allScheme = adapterSchemeService.getAdapterSchemeResultModelList();
- List<AdapterSchemeResultModel> allSchemeLs = allScheme.getDetailModelList();
- if (allSchemeLs != null && allSchemeLs.size() > 0) {
- DetailModelResult checkedSchemeLs = getDataSetResult(null, null);
- List<Map<String, Object>> list = checkedSchemeLs.getDetailModelList();
- Set<String> schemeIdSets = new HashSet<>();
- Set<String> versionSets = new HashSet<>();
- DetailModelResult result = new DetailModelResult();
- List<AdapterSchemeVersionResultDetailModel> versionModelList = new ArrayList<>();
- if (list != null && list.size() > 0) {
- for (Map<String, Object> objectMap : list) {
- if (objectMap.get("scheme_id") != null && objectMap.get("scheme_version_id") != null) {
- schemeIdSets.add(objectMap.get("scheme_id").toString());
- versionSets.add(objectMap.get("scheme_version_id").toString());
- }
- }
- if (schemeIdSets.size() > 0 && versionSets.size() > 0) {
- for (AdapterSchemeResultModel schemeL : allSchemeLs) {
- String schemeID = schemeL.getSchemeId().toString();
- if (schemeIdSets.contains(schemeID)) {
- String name = schemeL.getName();
- List<AdapterSchemeVersionModel> versionModels = schemeL.getVersionList();
- for (AdapterSchemeVersionModel versionModel : versionModels) {
- String versionID = versionModel.getId().toString();
- if (versionSets.contains(versionID)) {
- AdapterSchemeVersionResultDetailModel model = new AdapterSchemeVersionResultDetailModel();
- model.setSchemeName(name);
- model.setId(versionModel.getId());
- model.setName(versionModel.getName());
- model.setSchemeId(versionModel.getSchemeId());
- model.setBaseVersion(versionModel.getBaseVersion());
- model.setPath(versionModel.getPath());
- model.setPublishStatus(versionModel.getPublishStatus());
- model.setPublishTime(versionModel.getPublishTime());
- model.setPublishUser(versionModel.getPublishUser());
- model.setVersion(versionModel.getVersion());
- versionModelList.add(model);
- }
- }
- }
- }
- result.setDetailModelList(versionModelList);
- return result;
- }
- }
- }
- return DetailModelResult.error("获取已编排任务适配方案失败!");
- }
- }
|