|
@ -1,5 +1,6 @@
|
|
|
package com.yihu.ehr.datacollect.service;
|
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.JavaType;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.yihu.ehr.common.Services;
|
|
@ -18,13 +19,26 @@ import com.yihu.ehr.framework.model.ActionResult;
|
|
|
import com.yihu.ehr.framework.util.httpclient.HttpHelper;
|
|
|
import com.yihu.ehr.framework.util.httpclient.HttpResponse;
|
|
|
import com.yihu.ehr.framework.util.log.LogService;
|
|
|
import com.yihu.ehr.framework.util.webservice.WebserviceUtil;
|
|
|
import com.yihu.ehr.resource.service.IStdService;
|
|
|
import org.apache.axis.client.Call;
|
|
|
import org.apache.cxf.endpoint.Client;
|
|
|
import org.apache.cxf.jaxws.endpoint.dynamic.JaxWsDynamicClientFactory;
|
|
|
import org.dom4j.Document;
|
|
|
import org.dom4j.DocumentHelper;
|
|
|
import org.dom4j.Element;
|
|
|
import org.dom4j.io.SAXReader;
|
|
|
import org.json.JSONObject;
|
|
|
import org.json.JSONArray;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
import org.xml.sax.InputSource;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import javax.xml.namespace.QName;
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.URL;
|
|
|
import java.util.*;
|
|
|
|
|
|
/**
|
|
@ -462,10 +476,217 @@ public class DatacollectService implements IDatacollectService {
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* webservice采集
|
|
|
* XML转JSONList
|
|
|
* @return
|
|
|
*/
|
|
|
private List<JSONObject> getListFromXml(String xml) throws Exception
|
|
|
{
|
|
|
SAXReader reader = new SAXReader();
|
|
|
Document doc = reader.read(new ByteArrayInputStream(xml.getBytes("UTF-8")));
|
|
|
Element root = doc.getRootElement();
|
|
|
List<JSONObject> re = new ArrayList<>();
|
|
|
|
|
|
//xml数据列表
|
|
|
Iterator iter = root.elementIterator("Data");
|
|
|
while (iter.hasNext())
|
|
|
{
|
|
|
JSONObject obj = new JSONObject();
|
|
|
Element el =(Element)iter.next();
|
|
|
Iterator cols = el.elementIterator();
|
|
|
while (cols.hasNext())
|
|
|
{
|
|
|
Element col =(Element)cols.next();
|
|
|
obj.put(col.getName().toUpperCase(),col.getStringValue());
|
|
|
}
|
|
|
re.add(obj);
|
|
|
}
|
|
|
|
|
|
return re;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 数据库表采集
|
|
|
* @return
|
|
|
*/
|
|
|
private String collectWebservice(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
|
|
|
{
|
|
|
String message = "";
|
|
|
String datasetId = ds.getJobDatasetId();
|
|
|
String jobDatasetName = ds.getJobDatasetName();
|
|
|
String condition=ds.getJobDatasetCondition();
|
|
|
String key=ds.getJobDatasetKey();
|
|
|
String keytype=ds.getJobDatasetKeytype();
|
|
|
String keyvalue=ds.getJobDatasetKeyvalue();
|
|
|
String orgCode = ds.getOrgCode();
|
|
|
|
|
|
String datasourceId = ds.getDatasourceId();
|
|
|
String config = ds.getConfig(); //数据库连接
|
|
|
DBType dbType = DBType.Oracle;//********** 先定死Oracle ****************************
|
|
|
|
|
|
//webservice地址
|
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
|
Map<String,String> mapConfig = objectMapper.readValue(config,Map.class);
|
|
|
if(mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) {
|
|
|
String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
|
|
|
//获取数据集映射
|
|
|
List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
|
|
|
JSONArray datasetList = new JSONArray(datasetString);
|
|
|
|
|
|
if (datasetList != null && datasetList.length() > 0) {
|
|
|
String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
|
|
|
String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
|
|
|
//获取数据集字段映射结构
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
|
|
|
if (colList != null && colList.length() > 0) {
|
|
|
//拼接查询sql
|
|
|
String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
|
|
|
for (int i = 0; i < colList.length(); i++) {
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String adapterMetadataCode = col.optString("adapterMetadataCode");
|
|
|
if (adapterMetadataCode.length() > 0) {
|
|
|
strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
|
|
|
}
|
|
|
}
|
|
|
strSql += " from " + adapterTableName;
|
|
|
String strWhere = " where 1=1";
|
|
|
//采集范围
|
|
|
if (condition != null && condition.length() > 0) {
|
|
|
strWhere += getCondition(dbType, condition);
|
|
|
}
|
|
|
//增量采集
|
|
|
String maxKey = "0";
|
|
|
String keyValue = ds.getJobDatasetKeyvalue();
|
|
|
if (key != null && key.length() > 0 && (keyValue != null && !keyValue.equals("null"))) {
|
|
|
maxKey = key;
|
|
|
if (keytype.toUpperCase().equals("DATE")) //时间类型
|
|
|
{
|
|
|
Date keyDate = new Date();
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
//字符串转时间
|
|
|
keyDate = DateConvert.toDate(keyvalue);
|
|
|
}
|
|
|
//根据数据库类型获取时间sql
|
|
|
strWhere += " and " + key + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
|
|
|
strWhere += " order by " + key;
|
|
|
} else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
|
|
|
{
|
|
|
maxKey = getToNumberSqlByDBType(dbType, key);
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
strWhere += " and " + maxKey + ">'" + keyvalue + "'";
|
|
|
}
|
|
|
strWhere += " order by " + maxKey;
|
|
|
} else {
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
strWhere += " and " + key + ">'" + keyvalue + "'";
|
|
|
}
|
|
|
strWhere += " order by " + key;
|
|
|
}
|
|
|
}
|
|
|
strSql += strWhere;
|
|
|
//总条数和最大值查询
|
|
|
String sqlCount = "select count(1) as COUNT,max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
|
|
|
|
|
|
//webservice获取数据//获取总条数和最大值
|
|
|
String strCount = WebserviceUtil.request(url,"ExcuteSQL",new Object[]{"",sqlCount});
|
|
|
List<JSONObject> dataCount = getListFromXml(strCount);
|
|
|
|
|
|
if (dataCount!=null &&dataCount.size()>0) {
|
|
|
Integer count = Integer.parseInt(dataCount.get(0).getString("COUNT"));
|
|
|
if (count == 0) //0条记录,无需采集
|
|
|
{
|
|
|
message = "0条记录,无需采集。";
|
|
|
}
|
|
|
else {
|
|
|
int successCount = 0;
|
|
|
String maxKeyvalue = dataCount.get(0).getString("MAX_KEYVALUE");
|
|
|
int countPage = 1;
|
|
|
if (count > maxNum) //分页采集
|
|
|
{
|
|
|
countPage = count / maxNum + 1;
|
|
|
}
|
|
|
for (int i = 0; i < countPage; i++) {
|
|
|
int rows = maxNum;
|
|
|
if (i + 1 == countPage) {
|
|
|
rows = count - i * maxNum;
|
|
|
}
|
|
|
String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
|
|
|
|
|
|
RsJobLogDetail detail = new RsJobLogDetail();
|
|
|
detail.setStartTime(new Date());
|
|
|
detail.setJobLogId(logId);
|
|
|
detail.setDatasourceId(datasourceId);
|
|
|
detail.setConfig(config);
|
|
|
detail.setStdDatasetCode(stdTableName);
|
|
|
detail.setJobDatasetId(datasetId);
|
|
|
detail.setJobDatasetName(ds.getJobDatasetName());
|
|
|
detail.setJobId(ds.getJobId());
|
|
|
detail.setJobSql(sql);
|
|
|
detail.setJobNum(i + 1);
|
|
|
detail.setJobDatasetRows(rows);
|
|
|
detail.setSchemeVersion(schemeVersion);
|
|
|
|
|
|
String msg = "";
|
|
|
try {
|
|
|
//获取分页数据
|
|
|
String strList = WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sql});
|
|
|
List<JSONObject> list = getListFromXml(strList);
|
|
|
if (list != null) {
|
|
|
msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
|
|
|
} else {
|
|
|
msg = "查询数据为空!";
|
|
|
}
|
|
|
|
|
|
if (msg.length() > 0) {
|
|
|
//任务日志细表异常操作
|
|
|
detail.setJobStatus("0");
|
|
|
detail.setJobContent(msg);
|
|
|
LogService.getLogger().info(msg); //文本日志
|
|
|
} else {
|
|
|
detail.setJobStatus("1");
|
|
|
detail.setJobContent("采集成功!");
|
|
|
successCount += rows;
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
msg=ex.getMessage();
|
|
|
}
|
|
|
detail.setEndTime(new Date());
|
|
|
datacollectLogDao.saveEntity(detail);
|
|
|
}
|
|
|
|
|
|
//修改最大值
|
|
|
if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
|
|
|
datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
|
|
|
LogService.getLogger().info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
|
|
|
}
|
|
|
|
|
|
message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集字段映射为空!");
|
|
|
}
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集映射为空!");
|
|
|
}
|
|
|
}
|
|
|
else{
|
|
|
throw new Exception("非法webservice路径!");
|
|
|
}
|
|
|
|
|
|
LogService.getLogger().info(message);
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* webservice采集(参数)
|
|
|
*//*
|
|
|
private String collectWebservice(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
|
|
|
{
|
|
|
String message = "";
|
|
|
String datasetId = ds.getJobDatasetId();
|
|
@ -650,7 +871,7 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
LogService.getLogger().info(message);
|
|
|
return message;
|
|
|
}
|
|
|
}*/
|
|
|
|
|
|
|
|
|
|
|
@ -717,6 +938,7 @@ public class DatacollectService implements IDatacollectService {
|
|
|
}
|
|
|
|
|
|
} catch (Exception ex) {
|
|
|
ex.printStackTrace();
|
|
|
LogService.getLogger().info("异常:" + ex.getMessage());
|
|
|
logStr.append(ex.getMessage() + "\n");
|
|
|
logStr.append("/*********** 出现异常,中断采集 *******************/\n");
|
|
@ -793,4 +1015,12 @@ public class DatacollectService implements IDatacollectService {
|
|
|
return new ActionResult(true,"补采成功!");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public static void main(String[] args) throws Exception{
|
|
|
//namespace是命名空间,methodName是方法名
|
|
|
String sql = "select count(1) as COUNT,max(to_number(HDSD03_01_031)) as MAX_KEYVALUE from HDSC01_02 where 1=1 order by to_number(HDSD03_01_031)";
|
|
|
//调用web Service//输出调用结果
|
|
|
System.out.println(WebserviceUtil.request("http://172.19.103.71:8080/service/sql?wsdl", "ExcuteSQL", new Object[]{"", sql}));
|
|
|
|
|
|
}
|
|
|
}
|