|
@ -1,5 +1,6 @@
|
|
|
package com.yihu.ehr.datacollect.service;
|
|
|
|
|
|
import com.fasterxml.jackson.databind.JavaType;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.yihu.ehr.common.Services;
|
|
|
import com.yihu.ehr.datacollect.dao.intf.IDatacollectDao;
|
|
@ -14,6 +15,8 @@ import com.yihu.ehr.dbhelper.jdbc.DBHelper;
|
|
|
import com.yihu.ehr.dbhelper.mongodb.MongodbHelper;
|
|
|
import com.yihu.ehr.framework.constrant.DateConvert;
|
|
|
import com.yihu.ehr.framework.model.ActionResult;
|
|
|
import com.yihu.ehr.framework.util.httpclient.HttpHelper;
|
|
|
import com.yihu.ehr.framework.util.httpclient.HttpResponse;
|
|
|
import com.yihu.ehr.framework.util.log.LogService;
|
|
|
import com.yihu.ehr.resource.service.IStdService;
|
|
|
import org.json.JSONObject;
|
|
@ -22,10 +25,7 @@ import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* 数据采集执行服务
|
|
@ -234,6 +234,41 @@ public class DatacollectService implements IDatacollectService {
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 采集入库
|
|
|
* @return
|
|
|
*/
|
|
|
private String intoMongodb(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
|
|
|
{
|
|
|
try{
|
|
|
if(list!=null && list.size()>0)
|
|
|
{
|
|
|
//字典转换
|
|
|
list = translateDict(list, colList,schemeVersion);
|
|
|
|
|
|
//采集到mongodb
|
|
|
boolean b = mongo.insert(stdDatasetCode,list);
|
|
|
if(!b)
|
|
|
{
|
|
|
if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
|
|
|
{
|
|
|
System.out.print(mongo.errorMessage);
|
|
|
return mongo.errorMessage;
|
|
|
}
|
|
|
else {
|
|
|
return "Mongodb保存失败!(表:"+stdDatasetCode+")";
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
catch (Exception e)
|
|
|
{
|
|
|
return e.getMessage();
|
|
|
}
|
|
|
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 数据库表采集
|
|
|
* @return
|
|
@ -428,45 +463,197 @@ public class DatacollectService implements IDatacollectService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 采集入库
|
|
|
* @return
|
|
|
* webservice采集
|
|
|
*/
|
|
|
private String intoMongodb(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
|
|
|
private String collectWebservice(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
|
|
|
{
|
|
|
try{
|
|
|
if(list!=null && list.size()>0)
|
|
|
{
|
|
|
//字典转换
|
|
|
list = translateDict(list, colList,schemeVersion);
|
|
|
String message = "";
|
|
|
String datasetId = ds.getJobDatasetId();
|
|
|
String jobDatasetName = ds.getJobDatasetName();
|
|
|
String condition=ds.getJobDatasetCondition();
|
|
|
String key=ds.getJobDatasetKey();
|
|
|
String keytype=ds.getJobDatasetKeytype();
|
|
|
String keyvalue=ds.getJobDatasetKeyvalue();
|
|
|
String orgCode = ds.getOrgCode();
|
|
|
String config = ds.getConfig();
|
|
|
String datasourceId = ds.getDatasourceId();
|
|
|
//webservice地址
|
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
|
Map<String,String> mapConfig = objectMapper.readValue(config,Map.class);
|
|
|
if(mapConfig.containsKey("protocol") && mapConfig.containsKey("url"))
|
|
|
{
|
|
|
String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
|
|
|
//获取数据集映射
|
|
|
List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
|
|
|
JSONArray datasetList = new JSONArray(datasetString);
|
|
|
|
|
|
//采集到mongodb
|
|
|
boolean b = mongo.insert(stdDatasetCode,list);
|
|
|
if(!b)
|
|
|
{
|
|
|
if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
|
|
|
if(datasetList!=null &&datasetList.length()>0)
|
|
|
{
|
|
|
String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
|
|
|
String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
|
|
|
//获取数据集字段映射结构
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion,datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
|
|
|
if(colList!=null && colList.length()>0) {
|
|
|
|
|
|
Map<String,Object> params = new HashMap<>();
|
|
|
String where = "";
|
|
|
params.put("table",adapterTableName);//表名
|
|
|
String paramsStr = "";
|
|
|
//设置最大值
|
|
|
if(keyvalue!=null && keyvalue.length()>0)
|
|
|
{
|
|
|
System.out.print(mongo.errorMessage);
|
|
|
return mongo.errorMessage;
|
|
|
if(condition.length()>0 && !condition.equals("[]"))
|
|
|
{
|
|
|
paramsStr = condition.substring(0,condition.length()-1) + ",{\"andOr\":\" AND \",\"field\":\""+key+"\",\"condition\":\" > \",\"value\":\""+keyvalue+"\"}]";
|
|
|
}
|
|
|
else{
|
|
|
paramsStr = "[{\"andOr\":\" AND \",\"field\":\""+key+"\",\"condition\":\" > \",\"value\":\""+keyvalue+"\"}]";
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
return "Mongodb保存失败!(表:"+stdDatasetCode+")";
|
|
|
else{
|
|
|
paramsStr = condition;
|
|
|
}
|
|
|
params.put("params",paramsStr);
|
|
|
params.put("key",key);
|
|
|
params.put("keytype",keytype);
|
|
|
|
|
|
//获取总条数和最大值
|
|
|
HttpResponse responseCount = HttpHelper.get(url,params);
|
|
|
Map<String,String> mapCount = objectMapper.readValue(responseCount.getBody(),Map.class);
|
|
|
if(responseCount.getStatusCode() == 200 && mapCount.containsKey("count"))
|
|
|
{
|
|
|
Integer count = Integer.parseInt(mapCount.get("count"));
|
|
|
if(count == 0) //0条记录,无需采集
|
|
|
{
|
|
|
message = "0条记录,无需采集。";
|
|
|
}
|
|
|
else {
|
|
|
params.remove("key");
|
|
|
params.remove("keytype");
|
|
|
int successCount = 0;
|
|
|
String maxKeyvalue = mapCount.get("maxKeyvalue");
|
|
|
int countPage = 1;
|
|
|
if(count > maxNum) //分页采集
|
|
|
{
|
|
|
countPage = count/maxNum+1;
|
|
|
}
|
|
|
for(int i=0;i<countPage;i++)
|
|
|
{
|
|
|
int rows = maxNum;
|
|
|
if(i+1==countPage){
|
|
|
rows = count-i*maxNum;
|
|
|
}
|
|
|
int start = i*maxNum; //开始行数
|
|
|
|
|
|
RsJobLogDetail detail = new RsJobLogDetail();
|
|
|
detail.setStartTime(new Date());
|
|
|
detail.setJobLogId(logId);
|
|
|
detail.setDatasourceId(datasourceId);
|
|
|
detail.setConfig(config);
|
|
|
detail.setStdDatasetCode(stdTableName);
|
|
|
detail.setJobDatasetId(datasetId);
|
|
|
detail.setJobDatasetName(ds.getJobDatasetName());
|
|
|
detail.setJobId(ds.getJobId());
|
|
|
detail.setJobSql("{table:\""+adapterTableName+"\",where:\""+where+"\",start:\""+start+"\",rows:\""+rows+"\"}");
|
|
|
detail.setJobNum(i+1);
|
|
|
detail.setJobDatasetRows(rows);
|
|
|
detail.setSchemeVersion(schemeVersion);
|
|
|
|
|
|
//分页获取数据
|
|
|
params.put("start",start);
|
|
|
params.put("rows",rows);
|
|
|
HttpResponse responseList = HttpHelper.get(url,params);
|
|
|
String msg = "";
|
|
|
if(responseList.getStatusCode() == 200) {
|
|
|
JavaType javaType = objectMapper.getTypeFactory().constructParametricType(List.class, JSONObject.class);
|
|
|
List<JSONObject> list = objectMapper.readValue(responseList.getBody(), javaType);
|
|
|
|
|
|
if (list != null) {
|
|
|
List<JSONObject> listNew = new ArrayList<>();
|
|
|
//遍历所有数据,转译数据元代码
|
|
|
for(JSONObject obj : list)
|
|
|
{
|
|
|
JSONObject objNew = new JSONObject();
|
|
|
//转译字段名
|
|
|
for(int j=0; j< colList.length();j++)
|
|
|
{
|
|
|
JSONObject col = colList.getJSONObject(j);
|
|
|
String adapterMetadataCode = col.optString("adapterMetadataCode");
|
|
|
String stdMetadataCode = col.optString("stdMetadataCode");
|
|
|
|
|
|
if(obj.has(adapterMetadataCode))
|
|
|
{
|
|
|
objNew.put(stdMetadataCode,obj.get(adapterMetadataCode));
|
|
|
}
|
|
|
}
|
|
|
listNew.add(objNew);
|
|
|
}
|
|
|
|
|
|
msg = intoMongodb(listNew, schemeVersion, stdTableName, colList); //返回信息
|
|
|
} else {
|
|
|
msg = "查询数据为空!";
|
|
|
}
|
|
|
|
|
|
}
|
|
|
else{
|
|
|
msg = responseList.getBody();
|
|
|
if(msg.length()==0)
|
|
|
{
|
|
|
msg = "获取数据失败!";
|
|
|
}
|
|
|
}
|
|
|
if(msg.length()>0)
|
|
|
{
|
|
|
//任务日志细表异常操作
|
|
|
detail.setJobStatus("0");
|
|
|
detail.setJobContent(msg);
|
|
|
LogService.getLogger().info(msg); //文本日志
|
|
|
}
|
|
|
else{
|
|
|
detail.setJobStatus("1");
|
|
|
detail.setJobContent("采集成功!");
|
|
|
successCount += rows;
|
|
|
}
|
|
|
detail.setEndTime(new Date());
|
|
|
datacollectLogDao.saveEntity(detail);
|
|
|
}
|
|
|
|
|
|
//修改最大值
|
|
|
if(maxKeyvalue!=null&& maxKeyvalue.length()>0)
|
|
|
{
|
|
|
datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(),maxKeyvalue);
|
|
|
LogService.getLogger().info("修改任务数据集最大值为"+maxKeyvalue+"。"); //文本日志
|
|
|
}
|
|
|
message = jobDatasetName + "采集成功"+successCount+"条数据,总条数"+count+"条。";
|
|
|
}
|
|
|
|
|
|
}
|
|
|
else{
|
|
|
throw new Exception(jobDatasetName + "获取总条数异常!"+responseCount.getBody());
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
throw new Exception(jobDatasetName + "数据集字段映射为空!");
|
|
|
}
|
|
|
|
|
|
}
|
|
|
else{
|
|
|
throw new Exception(jobDatasetName + "数据集映射为空!");
|
|
|
}
|
|
|
}
|
|
|
catch (Exception e)
|
|
|
{
|
|
|
return e.getMessage();
|
|
|
else{
|
|
|
throw new Exception("非法webservice路径!");
|
|
|
}
|
|
|
|
|
|
return "";
|
|
|
LogService.getLogger().info(message);
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 执行任务
|
|
|
*/
|
|
@ -504,7 +691,7 @@ public class DatacollectService implements IDatacollectService {
|
|
|
if (type != null) {
|
|
|
if (type.equals("1")) //Web Service
|
|
|
{
|
|
|
message = "Web Service采集。\n";
|
|
|
message = collectWebservice(ds, schemeVersion, logId) + "\n";
|
|
|
} else if (type.equals("2"))//文件系统
|
|
|
{
|
|
|
message = "文件系统采集。\n";
|