|
@ -1,9 +1,7 @@
|
|
|
package com.yihu.ehr.datacollect.service;
|
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.JavaType;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.mongodb.client.MongoCollection;
|
|
|
import com.yihu.ehr.common.Services;
|
|
|
import com.yihu.ehr.crawler.model.config.SysConfig;
|
|
|
import com.yihu.ehr.crawler.model.patient.PatientIdentity;
|
|
@ -16,35 +14,24 @@ import com.yihu.ehr.dbhelper.common.QueryCondition;
|
|
|
import com.yihu.ehr.dbhelper.common.enums.DBType;
|
|
|
import com.yihu.ehr.dbhelper.common.sqlparser.*;
|
|
|
import com.yihu.ehr.dbhelper.jdbc.DBHelper;
|
|
|
import com.yihu.ehr.dbhelper.mongodb.MongodbFactory;
|
|
|
import com.yihu.ehr.dbhelper.mongodb.MongodbHelper;
|
|
|
import com.yihu.ehr.framework.constrant.Constants;
|
|
|
import com.yihu.ehr.framework.constrant.SqlConstants;
|
|
|
import com.yihu.ehr.framework.constrant.DateConvert;
|
|
|
import com.yihu.ehr.framework.model.ActionResult;
|
|
|
import com.yihu.ehr.framework.util.httpclient.HttpHelper;
|
|
|
import com.yihu.ehr.framework.util.httpclient.HttpResponse;
|
|
|
import com.yihu.ehr.framework.util.log.LogService;
|
|
|
import com.yihu.ehr.framework.util.operator.CollectionUtil;
|
|
|
import com.yihu.ehr.framework.util.log.Logger;
|
|
|
import com.yihu.ehr.framework.util.log.LoggerFactory;
|
|
|
import com.yihu.ehr.framework.util.webservice.WebserviceUtil;
|
|
|
import com.yihu.ehr.resource.service.IStdService;
|
|
|
import org.apache.axis.client.Call;
|
|
|
import org.apache.cxf.endpoint.Client;
|
|
|
import org.apache.cxf.jaxws.endpoint.dynamic.JaxWsDynamicClientFactory;
|
|
|
import org.dom4j.Document;
|
|
|
import org.dom4j.DocumentHelper;
|
|
|
import org.dom4j.Element;
|
|
|
import org.dom4j.io.SAXReader;
|
|
|
import org.json.JSONObject;
|
|
|
import org.json.JSONArray;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
import org.xml.sax.InputSource;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import javax.xml.namespace.QName;
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.URL;
|
|
|
import java.text.ParseException;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.*;
|
|
@ -54,49 +41,185 @@ import java.util.*;
|
|
|
*/
|
|
|
@Service(Services.DatacollectService)
|
|
|
public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
static private final Logger logger = LoggerFactory.getLogger(DatacollectService.class);
|
|
|
MongodbHelper mongoOrigin = new MongodbHelper("origin");
|
|
|
MongodbHelper mongo = new MongodbHelper();
|
|
|
String dateFormat = "yyyy-MM-dd HH:mm:ss"; //默认时间字符串格式
|
|
|
int maxNum = 1000; //查询条数限制
|
|
|
@Resource(name = Services.Datacollect)
|
|
|
private IDatacollectManager datacollect;
|
|
|
|
|
|
@Resource(name = Services.StdService)
|
|
|
private IStdService stdService;
|
|
|
|
|
|
@Resource(name = "DatacollectDao")
|
|
|
private IDatacollectDao datacollectDao;
|
|
|
|
|
|
@Resource(name = "DatacollectLogDao")
|
|
|
private IDatacollectLogDao datacollectLogDao;
|
|
|
|
|
|
MongodbHelper mongoOrigin = new MongodbHelper("origin");
|
|
|
MongodbHelper mongo = new MongodbHelper();
|
|
|
|
|
|
String dateFormat = "yyyy-MM-dd HH:mm:ss"; //默认时间字符串格式
|
|
|
int maxNum = 1000; //查询条数限制
|
|
|
|
|
|
/**
|
|
|
* 根据连接字符串获取数据库类型
|
|
|
*/
|
|
|
private static DBType getDbType(String uri) {
|
|
|
return uri.startsWith("jdbc:mysql")?DBType.Mysql:(uri.startsWith("jdbc:oracle")?DBType.Oracle:(uri.startsWith("jdbc:hive2")?DBType.Hive:(uri.startsWith("jdbc:microsoft:sqlserver")?DBType.Sqlserver:DBType.Mysql)));
|
|
|
return uri.startsWith("jdbc:mysql") ? DBType.Mysql : (uri.startsWith("jdbc:oracle") ? DBType.Oracle : (uri.startsWith("jdbc:hive2") ? DBType.Hive : (uri.startsWith("jdbc:microsoft:sqlserver") ? DBType.Sqlserver : DBType.Mysql)));
|
|
|
}
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
//namespace是命名空间,methodName是方法名
|
|
|
String sql = "select count(1) as COUNT,max(to_number(HDSD03_01_031)) as MAX_KEYVALUE from HDSC01_02 where 1=1 order by to_number(HDSD03_01_031)";
|
|
|
//调用web Service//输出调用结果
|
|
|
System.out.println(WebserviceUtil.request("http://172.19.103.71:8080/service/sql?wsdl", "ExcuteSQL", new Object[]{"", sql}));
|
|
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 执行任务
|
|
|
*/
|
|
|
@Override
|
|
|
public void executeJob(String jobId) throws Exception {
|
|
|
|
|
|
//获取任务详细信息
|
|
|
RsJobConfig job = datacollect.getJobById(jobId);
|
|
|
RsJobLog log = new RsJobLog();
|
|
|
log.setJobId(jobId);
|
|
|
log.setJobStartTime(new Date());
|
|
|
datacollectLogDao.saveEntity(log);
|
|
|
String logId = log.getId();
|
|
|
logger.info("任务" + jobId + "开始采集,新增日志" + logId + "。");
|
|
|
|
|
|
StringBuilder logStr = new StringBuilder();
|
|
|
int count = 0;
|
|
|
int success = 0;
|
|
|
|
|
|
try {
|
|
|
String schemeVersion = job.getSchemeVersion();
|
|
|
//获取任务相关数据集
|
|
|
List<DtoJobDataset> list = datacollectDao.getDatacollectDataset(jobId);
|
|
|
logger.info("获取任务相关数据集,数量" + list.size() + "。");
|
|
|
|
|
|
if (list != null && list.size() > 0) {
|
|
|
count = list.size();
|
|
|
logStr.append("/*********** 开始采集 *******************/\n");
|
|
|
//遍历数据集
|
|
|
for (DtoJobDataset ds : list) {
|
|
|
try {
|
|
|
String type = ds.getType();
|
|
|
String message = "";
|
|
|
logStr.append(DateConvert.toString(new Date(), dateFormat) + " " + ds.getJobDatasetName());
|
|
|
if (type != null) {
|
|
|
if (type.equals("1")) //Web Service
|
|
|
{
|
|
|
message = collectWebservice(ds, schemeVersion, logId) + "\n";
|
|
|
} else if (type.equals("2"))//文件系统
|
|
|
{
|
|
|
message = "文件系统采集。\n";
|
|
|
} else { //数据库
|
|
|
message = collectTable(ds, schemeVersion, logId) + "\n";
|
|
|
}
|
|
|
} else {
|
|
|
message = ds.getJobDatasetName() + "未关联数据源!\n";
|
|
|
}
|
|
|
|
|
|
|
|
|
logger.info(message); //文本日志
|
|
|
logStr.append(message);
|
|
|
success++;
|
|
|
} catch (Exception ex) {
|
|
|
logger.info("异常:" + ex.getMessage());
|
|
|
logStr.append(ex.getMessage() + "\n");
|
|
|
}
|
|
|
}
|
|
|
logStr.append("/*********** 结束采集 *******************/\n");
|
|
|
}
|
|
|
|
|
|
} catch (Exception ex) {
|
|
|
ex.printStackTrace();
|
|
|
logger.info("异常:" + ex.getMessage());
|
|
|
logStr.append(ex.getMessage() + "\n");
|
|
|
logStr.append("/*********** 出现异常,中断采集 *******************/\n");
|
|
|
}
|
|
|
|
|
|
|
|
|
//任务主日志成功
|
|
|
String jobContent = logStr.toString().replace("\"", "\\\"");
|
|
|
if (jobContent.length() > 4000) {
|
|
|
jobContent = jobContent.substring(0, 4000);
|
|
|
}
|
|
|
log.setJobContent(jobContent);
|
|
|
log.setJobEndTime(new Date());
|
|
|
log.setJobDatasetCount(count);
|
|
|
log.setJobDatasetSuccess(success);
|
|
|
logger.info("任务结束," + count + "个数据集成功采集" + success + "个。");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据日志详细补采数据
|
|
|
*/
|
|
|
@Override
|
|
|
@Transactional
|
|
|
public ActionResult repeatJob(String id) throws Exception {
|
|
|
RsJobLogDetail log = datacollectLogDao.getEntity(RsJobLogDetail.class, id);
|
|
|
|
|
|
if (log.getJobStatus().equals("2")) {
|
|
|
return new ActionResult(false, "数据补采中!");
|
|
|
}
|
|
|
if (!log.getJobStatus().equals("0")) {
|
|
|
return new ActionResult(false, "数据无需补采!");
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
log.setRepeatStartTime(new Date());
|
|
|
log.setJobStatus("2"); //设置采集中状态
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
} catch (Exception e) {
|
|
|
return new ActionResult(false, "补采失败!");
|
|
|
}
|
|
|
log.setJobStatus("0");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
|
|
|
String stdDatasetCode = log.getStdDatasetCode();
|
|
|
String sql = log.getJobSql();
|
|
|
//数据库连接
|
|
|
String datasourceId = log.getDatasourceId();
|
|
|
String config = log.getConfig();
|
|
|
DBHelper db = new DBHelper(datasourceId, config);
|
|
|
//获取数据集字段映射结构
|
|
|
String schemeVersion = log.getSchemeVersion();
|
|
|
String datasetId = log.getJobDatasetId();
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
List<JSONObject> list = db.query(sql);
|
|
|
String message = intoMongodb(list, schemeVersion, stdDatasetCode, colList);
|
|
|
if (message.length() > 0 || db.errorMessage.length() > 0) {
|
|
|
log.setJobStatus("0");
|
|
|
log.setRepeatEndTime(new Date());
|
|
|
if (message.length() > 0) {
|
|
|
log.setRepeatJobContent(message);
|
|
|
} else {
|
|
|
db.errorMessage.length();
|
|
|
}
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
return new ActionResult(false, "补采失败!");
|
|
|
} else {
|
|
|
log.setJobStatus("3");
|
|
|
log.setRepeatEndTime(new Date());
|
|
|
log.setRepeatJobContent("补采成功!");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
return new ActionResult(true, "补采成功!");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据数据库类型获取时间sql
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String getDateSqlByDBType(DBType dbType,Date date) throws Exception
|
|
|
{
|
|
|
private String getDateSqlByDBType(DBType dbType, Date date) throws Exception {
|
|
|
String val = DateConvert.toString(date, dateFormat);
|
|
|
if(dbType.equals(DBType.Mysql))
|
|
|
{
|
|
|
if (dbType.equals(DBType.Mysql)) {
|
|
|
return "date_format(\'" + val + "\',\'" + dateFormat + "\')";
|
|
|
}
|
|
|
else if(dbType.equals(DBType.Oracle))
|
|
|
{
|
|
|
} else if (dbType.equals(DBType.Oracle)) {
|
|
|
return "to_date(\'" + val + "\',\'" + dateFormat + "\')";
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
} else {
|
|
|
return val;
|
|
|
}
|
|
|
}
|
|
@ -104,38 +227,27 @@ public class DatacollectService implements IDatacollectService {
|
|
|
/**
|
|
|
* 根据数据库类型获取转换数值型sql
|
|
|
*/
|
|
|
private String getToNumberSqlByDBType(DBType dbType,String key) throws Exception
|
|
|
{
|
|
|
if(dbType.equals(DBType.Mysql))
|
|
|
{
|
|
|
return "cast("+key+" as signed integer)";
|
|
|
}
|
|
|
else if(dbType.equals(DBType.Oracle))
|
|
|
{
|
|
|
private String getToNumberSqlByDBType(DBType dbType, String key) throws Exception {
|
|
|
if (dbType.equals(DBType.Mysql)) {
|
|
|
return "cast(" + key + " as signed integer)";
|
|
|
} else if (dbType.equals(DBType.Oracle)) {
|
|
|
return "to_number(" + key + ")";
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
} else {
|
|
|
return key;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据数据库类型获取分页sql
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String getPageSqlByDBType(DBType dbType,String sql,int start,int rows) throws Exception
|
|
|
{
|
|
|
if(dbType.equals(DBType.Mysql))
|
|
|
{
|
|
|
private String getPageSqlByDBType(DBType dbType, String sql, int start, int rows) throws Exception {
|
|
|
if (dbType.equals(DBType.Mysql)) {
|
|
|
return sql + " LIMIT " + start + "," + rows;
|
|
|
}
|
|
|
else if(dbType.equals(DBType.Oracle))
|
|
|
{
|
|
|
return " select * from (select t.*,ROWNUM RSCOM_RN from (" + sql + ") t where ROWNUM<" + (start+rows+1) + ") where RSCOM_RN>= " + (start+1);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
} else if (dbType.equals(DBType.Oracle)) {
|
|
|
return " select * from (select t.*,ROWNUM RSCOM_RN from (" + sql + ") t where ROWNUM<" + (start + rows + 1) + ") where RSCOM_RN>= " + (start + 1);
|
|
|
} else {
|
|
|
return sql;
|
|
|
}
|
|
|
}
|
|
@ -143,17 +255,14 @@ public class DatacollectService implements IDatacollectService {
|
|
|
/**
|
|
|
* 字典全转换成中文
|
|
|
*/
|
|
|
private List<JSONObject> translateDictCN(List<JSONObject> list,JSONArray colList,String schemeVersion) throws Exception
|
|
|
{
|
|
|
private List<JSONObject> translateDictCN(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
|
|
|
//获取字典列表
|
|
|
List<DtoDictCol> dictColList = new ArrayList<>();
|
|
|
|
|
|
for(int i=0; i< colList.length();i++)
|
|
|
{
|
|
|
for (int i = 0; i < colList.length(); i++) {
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String dictId = col.optString("adapterDictId");
|
|
|
if(dictId!=null && dictId.length()>0)
|
|
|
{
|
|
|
if (dictId != null && dictId.length() > 0) {
|
|
|
String dictType = col.optString("adapterDataType");
|
|
|
String stdMetadataCode = col.optString("stdMetadataCode");
|
|
|
DtoDictCol dictCol = new DtoDictCol();
|
|
@ -161,7 +270,7 @@ public class DatacollectService implements IDatacollectService {
|
|
|
dictCol.setStdDictId(dictId);
|
|
|
dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
|
|
|
//获取字典数据
|
|
|
List dictString = stdService.getDictByScheme(schemeVersion,dictId);
|
|
|
List dictString = stdService.getDictByScheme(schemeVersion, dictId);
|
|
|
JSONArray dictAdapterArray = new JSONArray(dictString);
|
|
|
|
|
|
dictCol.setDictList(dictAdapterArray);
|
|
@ -170,17 +279,15 @@ public class DatacollectService implements IDatacollectService {
|
|
|
}
|
|
|
|
|
|
//翻译列表
|
|
|
for(JSONObject data :list)
|
|
|
{
|
|
|
for (JSONObject data : list) {
|
|
|
//遍历字典字段
|
|
|
for (DtoDictCol col : dictColList) {
|
|
|
String colNmae = col.getStdMetadataCode();
|
|
|
String oldValue = data.optString(colNmae);
|
|
|
String newValue = translateDictValueCN(oldValue,col.getAdapterDataType(),col.getDictList());
|
|
|
String newValue = translateDictValueCN(oldValue, col.getAdapterDataType(), col.getDictList());
|
|
|
|
|
|
if(newValue!=null && newValue.length()>0)
|
|
|
{
|
|
|
data.put(colNmae,newValue);
|
|
|
if (newValue != null && newValue.length() > 0) {
|
|
|
data.put(colNmae, newValue);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@ -190,21 +297,20 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
/**
|
|
|
* 转译字典成中文
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String translateDictValueCN(String oldValue,String type,JSONArray dictAdapterList) throws Exception
|
|
|
{
|
|
|
if(type.equals("0")) //原本就是值
|
|
|
private String translateDictValueCN(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
|
|
|
if (type.equals("0")) //原本就是值
|
|
|
{
|
|
|
return oldValue;
|
|
|
}
|
|
|
|
|
|
|
|
|
//遍历字典数据(编码->名称)
|
|
|
for(int i=0; i< dictAdapterList.length();i++)
|
|
|
{
|
|
|
for (int i = 0; i < dictAdapterList.length(); i++) {
|
|
|
JSONObject dictItem = dictAdapterList.getJSONObject(i);
|
|
|
if(oldValue!=null && dictItem.has("stdEntryCode")) {
|
|
|
if (oldValue != null && dictItem.has("stdEntryCode")) {
|
|
|
if (oldValue.equals(dictItem.getString("stdEntryCode"))) {
|
|
|
String newValue = dictItem.getString("stdEntryValue"); //名称
|
|
|
return newValue;
|
|
@ -217,30 +323,28 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
/**
|
|
|
* 字典转换
|
|
|
*
|
|
|
* @param list
|
|
|
* @param colList
|
|
|
* @return
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
private List<JSONObject> translateDict(List<JSONObject> list,JSONArray colList,String schemeVersion) throws Exception
|
|
|
{
|
|
|
private List<JSONObject> translateDict(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
|
|
|
//获取字典列表
|
|
|
List<DtoDictCol> dictColList = new ArrayList<>();
|
|
|
|
|
|
for(int i=0; i< colList.length();i++)
|
|
|
{
|
|
|
for (int i = 0; i < colList.length(); i++) {
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String dictId = col.optString("adapterDictId");
|
|
|
if(dictId!=null && dictId.length()>0)
|
|
|
{
|
|
|
if (dictId != null && dictId.length() > 0) {
|
|
|
String dictType = col.optString("adapterDataType");
|
|
|
String stdMetadataCode = col.optString("stdMetadataCode");
|
|
|
DtoDictCol dictCol = new DtoDictCol();
|
|
|
dictCol.setStdMetadataCode(stdMetadataCode);
|
|
|
dictCol.setStdDictId(dictId);
|
|
|
dictCol.setAdapterDataType(dictType.length()>0?dictType:"1");//默认通过code转换字典
|
|
|
dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
|
|
|
//获取字典数据
|
|
|
List dictString = stdService.getDictByScheme(schemeVersion,dictId);
|
|
|
List dictString = stdService.getDictByScheme(schemeVersion, dictId);
|
|
|
JSONArray dictAdapterArray = new JSONArray(dictString);
|
|
|
|
|
|
dictCol.setDictList(dictAdapterArray);
|
|
@ -249,17 +353,15 @@ public class DatacollectService implements IDatacollectService {
|
|
|
}
|
|
|
|
|
|
//翻译列表
|
|
|
for(JSONObject data :list)
|
|
|
{
|
|
|
for (JSONObject data : list) {
|
|
|
//遍历字典字段
|
|
|
for (DtoDictCol col : dictColList) {
|
|
|
String colNmae = col.getStdMetadataCode();
|
|
|
String oldValue = data.optString(colNmae);
|
|
|
String newValue = translateDictValue(oldValue,col.getAdapterDataType(),col.getDictList());
|
|
|
String newValue = translateDictValue(oldValue, col.getAdapterDataType(), col.getDictList());
|
|
|
|
|
|
if(newValue!=null && newValue.length()>0)
|
|
|
{
|
|
|
data.put(colNmae,newValue);
|
|
|
if (newValue != null && newValue.length() > 0) {
|
|
|
data.put(colNmae, newValue);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@ -269,22 +371,21 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
/**
|
|
|
* 转译字典
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String translateDictValue(String oldValue,String type,JSONArray dictAdapterList) throws Exception
|
|
|
{
|
|
|
private String translateDictValue(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
|
|
|
//应用标准字段
|
|
|
String colName = "adapterEntryCode";
|
|
|
if(type.equals("0")) //通过name转译
|
|
|
if (type.equals("0")) //通过name转译
|
|
|
{
|
|
|
colName = "adapterEntryValue";
|
|
|
}
|
|
|
|
|
|
//遍历字典数据
|
|
|
for(int i=0; i< dictAdapterList.length();i++)
|
|
|
{
|
|
|
for (int i = 0; i < dictAdapterList.length(); i++) {
|
|
|
JSONObject dictItem = dictAdapterList.getJSONObject(i);
|
|
|
if(oldValue!=null && dictItem.has(colName)) {
|
|
|
if (oldValue != null && dictItem.has(colName)) {
|
|
|
if (oldValue.equals(dictItem.getString(colName))) {
|
|
|
String newValue = dictItem.getString("stdEntryCode");
|
|
|
return newValue;
|
|
@ -296,30 +397,27 @@ public class DatacollectService implements IDatacollectService {
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 获取过滤条件
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String getCondition(DBType dbType,String conditionString){
|
|
|
private String getCondition(DBType dbType, String conditionString) {
|
|
|
JSONArray array = new JSONArray(conditionString);
|
|
|
|
|
|
if(array!=null && array.length()>0)
|
|
|
{
|
|
|
if (array != null && array.length() > 0) {
|
|
|
List<QueryCondition> conditions = new ArrayList<>();
|
|
|
for(Object item : array)
|
|
|
{
|
|
|
JSONObject obj = (JSONObject)item;
|
|
|
for (Object item : array) {
|
|
|
JSONObject obj = (JSONObject) item;
|
|
|
String logical = obj.getString("andOr");
|
|
|
String operation= obj.getString("condition");
|
|
|
String field= obj.getString("field");
|
|
|
String operation = obj.getString("condition");
|
|
|
String field = obj.getString("field");
|
|
|
String keyword = obj.getString("value");
|
|
|
conditions.add(new QueryCondition(logical, operation, field, keyword));
|
|
|
}
|
|
|
//条件语句转换
|
|
|
ParserSql ps;
|
|
|
switch (dbType)
|
|
|
{
|
|
|
switch (dbType) {
|
|
|
case Oracle:
|
|
|
ps = new ParserOracle();
|
|
|
break;
|
|
@ -336,19 +434,19 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
/**
|
|
|
* 获取条件SQL
|
|
|
*
|
|
|
* @param dbType
|
|
|
* @param conditionString
|
|
|
* @return
|
|
|
* @throws ParseException
|
|
|
*/
|
|
|
private String getConditionSql(DBType dbType,String conditionString) throws ParseException {
|
|
|
private String getConditionSql(DBType dbType, String conditionString) throws ParseException {
|
|
|
String conditionSql = "";
|
|
|
JSONArray conditions = new JSONArray(conditionString);
|
|
|
Iterator iterator = conditions.iterator();
|
|
|
|
|
|
while (iterator.hasNext())
|
|
|
{
|
|
|
JSONObject condition = (JSONObject)iterator.next();
|
|
|
while (iterator.hasNext()) {
|
|
|
JSONObject condition = (JSONObject) iterator.next();
|
|
|
String logic = condition.getString("condition");
|
|
|
String andOr = condition.getString("andOr");
|
|
|
String field = condition.getString("field");
|
|
@ -356,38 +454,26 @@ public class DatacollectService implements IDatacollectService {
|
|
|
String fieldType = condition.getString("type");
|
|
|
String keys = "";
|
|
|
|
|
|
if(andOr.equals(" AND "))
|
|
|
{
|
|
|
if (andOr.equals(" AND ")) {
|
|
|
conditionSql = conditionSql + " and ";
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
} else {
|
|
|
conditionSql = conditionSql + " or ";
|
|
|
}
|
|
|
|
|
|
if(logic.equals(" IN ") || logic.equals(" NOT IN "))
|
|
|
{
|
|
|
if (logic.equals(" IN ") || logic.equals(" NOT IN ")) {
|
|
|
String[] keywords = value.split(",");
|
|
|
|
|
|
for(String key : keywords)
|
|
|
{
|
|
|
for (String key : keywords) {
|
|
|
keys += "'" + key + "',";
|
|
|
}
|
|
|
|
|
|
keys = " (" + keys.substring(0,keys.length() - 1) + ") ";
|
|
|
}
|
|
|
else if(logic.equals(" LIKE "))
|
|
|
{
|
|
|
keys = " (" + keys.substring(0, keys.length() - 1) + ") ";
|
|
|
} else if (logic.equals(" LIKE ")) {
|
|
|
keys += " '%" + value + "%' ";
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
if(fieldType.equals("DATE"))
|
|
|
{
|
|
|
keys += getDateFormatSql(dbType,value);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
} else {
|
|
|
if (fieldType.equals("DATE")) {
|
|
|
keys += getDateFormatSql(dbType, value);
|
|
|
} else {
|
|
|
keys += " '" + value + "' ";
|
|
|
}
|
|
|
}
|
|
@ -400,19 +486,19 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
/**
|
|
|
* 获取对应数据库时间格式
|
|
|
*
|
|
|
* @param dbType
|
|
|
* @param key
|
|
|
* @return
|
|
|
* @throws ParseException
|
|
|
*/
|
|
|
private String getDateFormatSql(DBType dbType,String key) throws ParseException {
|
|
|
private String getDateFormatSql(DBType dbType, String key) throws ParseException {
|
|
|
String dateFormat = "yyyy-MM-dd HH:mm:ss";
|
|
|
SimpleDateFormat formatDate = new SimpleDateFormat("yyyy-MM-dd");
|
|
|
Date d = formatDate.parse(key);
|
|
|
SimpleDateFormat format = new SimpleDateFormat(dateFormat);
|
|
|
|
|
|
switch (dbType)
|
|
|
{
|
|
|
switch (dbType) {
|
|
|
case Oracle:
|
|
|
key = "to_date(\'" + format.format(d) + "\',\'YYYY-MM-DD HH24:MI:SS\')";
|
|
|
break;
|
|
@ -428,46 +514,40 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
/**
|
|
|
* 采集入库
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String intoMongodb(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
|
|
|
{
|
|
|
String patientIdCode = Constants.PATIENT_ID.toUpperCase();
|
|
|
String eventNoCode = Constants.EVENT_NO.toUpperCase();
|
|
|
private String intoMongodb(List<JSONObject> list, String schemeVersion, String stdDatasetCode, JSONArray colList) {
|
|
|
String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
|
|
|
String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
|
|
|
PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
|
|
|
if (patientIdentity != null) {
|
|
|
patientIdCode = patientIdentity.getPatientIDCode();
|
|
|
eventNoCode = patientIdentity.getEventNoCode();
|
|
|
}
|
|
|
try{
|
|
|
if(!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
|
|
|
return "Mongodb索引创建失败!(表:"+stdDatasetCode+")";
|
|
|
try {
|
|
|
if (!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
|
|
|
return "Mongodb索引创建失败!(表:" + stdDatasetCode + ")";
|
|
|
}
|
|
|
if(list!=null && list.size()>0)
|
|
|
{
|
|
|
if (list != null && list.size() > 0) {
|
|
|
//字典未转换前采集到原始库
|
|
|
boolean b = mongoOrigin.insert(stdDatasetCode,translateDictCN(list, colList,schemeVersion));
|
|
|
boolean b = mongoOrigin.insert(stdDatasetCode, translateDictCN(list, colList, schemeVersion));
|
|
|
|
|
|
//字典转换
|
|
|
list = translateDict(list, colList,schemeVersion);
|
|
|
list = translateDict(list, colList, schemeVersion);
|
|
|
|
|
|
//采集到mongodb
|
|
|
b = mongo.insert(stdDatasetCode,list);
|
|
|
if(!b)
|
|
|
{
|
|
|
if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
|
|
|
{
|
|
|
b = mongo.insert(stdDatasetCode, list);
|
|
|
if (!b) {
|
|
|
if (mongo.errorMessage != null && mongo.errorMessage.length() > 0) {
|
|
|
System.out.print(mongo.errorMessage);
|
|
|
return mongo.errorMessage;
|
|
|
}
|
|
|
else {
|
|
|
return "Mongodb保存失败!(表:"+stdDatasetCode+")";
|
|
|
} else {
|
|
|
return "Mongodb保存失败!(表:" + stdDatasetCode + ")";
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
catch (Exception e)
|
|
|
{
|
|
|
} catch (Exception e) {
|
|
|
return e.getMessage();
|
|
|
}
|
|
|
|
|
@ -476,83 +556,75 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
/**
|
|
|
* 数据库表采集
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String collectTable(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
|
|
|
{
|
|
|
private String collectTable(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
|
|
|
String message = "";
|
|
|
String datasetId = ds.getJobDatasetId();
|
|
|
String jobDatasetName = ds.getJobDatasetName();
|
|
|
String condition=ds.getJobDatasetCondition();
|
|
|
String key=ds.getJobDatasetKey();
|
|
|
String keytype=ds.getJobDatasetKeytype();
|
|
|
String keyvalue=ds.getJobDatasetKeyvalue();
|
|
|
String condition = ds.getJobDatasetCondition();
|
|
|
String key = ds.getJobDatasetKey();
|
|
|
String keytype = ds.getJobDatasetKeytype();
|
|
|
String keyvalue = ds.getJobDatasetKeyvalue();
|
|
|
String orgCode = ds.getOrgCode();
|
|
|
|
|
|
String datasourceId = ds.getDatasourceId();
|
|
|
String config = ds.getConfig(); //数据库连接
|
|
|
DBHelper db = new DBHelper(datasourceId,config);
|
|
|
DBHelper db = new DBHelper(datasourceId, config);
|
|
|
DBType dbType = db.dbType;
|
|
|
|
|
|
//获取数据集映射
|
|
|
List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
|
|
|
JSONArray datasetList = new JSONArray(datasetString);
|
|
|
|
|
|
if(datasetList!=null &&datasetList.length()>0)
|
|
|
{
|
|
|
if (datasetList != null && datasetList.length() > 0) {
|
|
|
String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
|
|
|
String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
|
|
|
//获取数据集字段映射结构
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion,datasetId);
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
|
|
|
if(colList!=null && colList.length()>0)
|
|
|
{
|
|
|
if (colList != null && colList.length() > 0) {
|
|
|
//拼接查询sql
|
|
|
String strSql = "Select '" + orgCode +"' as RSCOM_ORG_CODE";
|
|
|
for(int i=0; i< colList.length();i++)
|
|
|
{
|
|
|
String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
|
|
|
for (int i = 0; i < colList.length(); i++) {
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String adapterMetadataCode = col.optString("adapterMetadataCode");
|
|
|
if(adapterMetadataCode.length()>0)
|
|
|
{
|
|
|
strSql+= ","+adapterMetadataCode +" as " + col.optString("stdMetadataCode") ;
|
|
|
if (adapterMetadataCode.length() > 0) {
|
|
|
strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
|
|
|
}
|
|
|
}
|
|
|
strSql += " from " +adapterTableName;
|
|
|
strSql += " from " + adapterTableName;
|
|
|
String strWhere = " where 1=1";
|
|
|
//采集范围
|
|
|
if(condition!=null && condition.length()>0)
|
|
|
{
|
|
|
strWhere += getConditionSql(dbType,condition);
|
|
|
if (condition != null && condition.length() > 0) {
|
|
|
strWhere += getConditionSql(dbType, condition);
|
|
|
}
|
|
|
//增量采集
|
|
|
String maxKey = "0";
|
|
|
if(key!=null && key.length()>0)
|
|
|
{
|
|
|
if (key != null && key.length() > 0) {
|
|
|
maxKey = key;
|
|
|
|
|
|
if(keytype.toUpperCase().equals("DATE")) //时间类型
|
|
|
if (keytype.toUpperCase().equals("DATE")) //时间类型
|
|
|
{
|
|
|
if(keyvalue!=null && keyvalue.length()>0) {
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
Date keyDate = new Date();
|
|
|
//字符串转时间
|
|
|
keyDate = DateConvert.toDate(keyvalue);
|
|
|
//根据数据库类型获取时间sql
|
|
|
strWhere += " and "+ maxKey + ">'"+getDateSqlByDBType(dbType,keyDate)+"'";
|
|
|
strWhere += " and " + maxKey + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
|
|
|
}
|
|
|
|
|
|
}
|
|
|
else if(keytype.toUpperCase().equals("VARCHAR")) //字符串类型
|
|
|
} else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
|
|
|
{
|
|
|
maxKey = getToNumberSqlByDBType(dbType,key);
|
|
|
if(keyvalue!=null && keyvalue.length()>0) {
|
|
|
strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
|
|
|
maxKey = getToNumberSqlByDBType(dbType, key);
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
strWhere += " and " + maxKey + ">'" + keyvalue + "'";
|
|
|
}
|
|
|
}
|
|
|
else{
|
|
|
if(keyvalue!=null && keyvalue.length()>0) {
|
|
|
strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
|
|
|
} else {
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
strWhere += " and " + maxKey + ">'" + keyvalue + "'";
|
|
|
}
|
|
|
}
|
|
|
strWhere += " order by " + maxKey;
|
|
@ -560,51 +632,43 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
strSql += strWhere;
|
|
|
//总条数
|
|
|
String sqlCount = "select count(1) as COUNT from (" + strSql+")";
|
|
|
String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
|
|
|
String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
|
|
|
JSONObject objCount = db.load(sqlCount);
|
|
|
if(objCount==null)
|
|
|
{
|
|
|
if(db.errorMessage.length()>0)
|
|
|
{
|
|
|
if (objCount == null) {
|
|
|
if (db.errorMessage.length() > 0) {
|
|
|
throw new Exception(db.errorMessage);
|
|
|
} else {
|
|
|
throw new Exception("查询异常:" + sqlCount);
|
|
|
}
|
|
|
else{
|
|
|
throw new Exception("查询异常:"+sqlCount);
|
|
|
}
|
|
|
}
|
|
|
else{
|
|
|
} else {
|
|
|
int count = objCount.getInt("COUNT");
|
|
|
|
|
|
if(count==0) //0条记录,无需采集
|
|
|
if (count == 0) //0条记录,无需采集
|
|
|
{
|
|
|
message = "0条记录,无需采集。";
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
} else {
|
|
|
//获取最大值
|
|
|
JSONObject objMax = db.load(sqlMax);
|
|
|
|
|
|
int successCount = 0;
|
|
|
String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
|
|
|
//修改最大值
|
|
|
if(maxKeyvalue!=null&& maxKeyvalue.length()>0)
|
|
|
{
|
|
|
datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(),maxKeyvalue);
|
|
|
LogService.getLogger().info("修改任务数据集最大值为"+maxKeyvalue+"。"); //文本日志
|
|
|
if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
|
|
|
datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
|
|
|
logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
|
|
|
}
|
|
|
int countPage = 1;
|
|
|
if(count > maxNum) //分页采集
|
|
|
if (count > maxNum) //分页采集
|
|
|
{
|
|
|
countPage = count/maxNum+1;
|
|
|
countPage = count / maxNum + 1;
|
|
|
}
|
|
|
for(int i=0;i<countPage;i++)
|
|
|
{
|
|
|
for (int i = 0; i < countPage; i++) {
|
|
|
int rows = maxNum;
|
|
|
if(i+1==countPage){
|
|
|
rows = count-i*maxNum;
|
|
|
if (i + 1 == countPage) {
|
|
|
rows = count - i * maxNum;
|
|
|
}
|
|
|
String sql = getPageSqlByDBType(dbType,strSql,i*maxNum,rows); //获取分页sql语句
|
|
|
String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
|
|
|
|
|
|
RsJobLogDetail detail = new RsJobLogDetail();
|
|
|
detail.setStartTime(new Date());
|
|
@ -616,34 +680,28 @@ public class DatacollectService implements IDatacollectService {
|
|
|
detail.setJobDatasetName(ds.getJobDatasetName());
|
|
|
detail.setJobId(ds.getJobId());
|
|
|
detail.setJobSql(sql);
|
|
|
detail.setJobNum(i+1);
|
|
|
detail.setJobNum(i + 1);
|
|
|
detail.setJobDatasetRows(rows);
|
|
|
detail.setSchemeVersion(schemeVersion);
|
|
|
|
|
|
List<JSONObject> list = db.query(sql);
|
|
|
String msg = "";
|
|
|
if(list!=null)
|
|
|
{
|
|
|
msg = intoMongodb(list,schemeVersion,stdTableName,colList); //返回信息
|
|
|
}
|
|
|
else{
|
|
|
if(db.errorMessage.length()>0)
|
|
|
{
|
|
|
if (list != null) {
|
|
|
msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
|
|
|
} else {
|
|
|
if (db.errorMessage.length() > 0) {
|
|
|
msg = db.errorMessage;
|
|
|
}
|
|
|
else{
|
|
|
} else {
|
|
|
msg = "查询数据为空!";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if(msg.length()>0)
|
|
|
{
|
|
|
if (msg.length() > 0) {
|
|
|
//任务日志细表异常操作
|
|
|
detail.setJobStatus("0");
|
|
|
detail.setJobContent(msg);
|
|
|
LogService.getLogger().info(msg); //文本日志
|
|
|
}
|
|
|
else{
|
|
|
logger.info(msg); //文本日志
|
|
|
} else {
|
|
|
detail.setJobStatus("1");
|
|
|
detail.setJobContent("采集成功!");
|
|
|
successCount += rows;
|
|
@ -652,31 +710,27 @@ public class DatacollectService implements IDatacollectService {
|
|
|
datacollectLogDao.saveEntity(detail);
|
|
|
}
|
|
|
|
|
|
message = jobDatasetName + "采集成功"+successCount+"条数据,总条数"+count+"条。";
|
|
|
message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集字段映射为空!");
|
|
|
}
|
|
|
}
|
|
|
else{
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集映射为空!");
|
|
|
}
|
|
|
|
|
|
|
|
|
LogService.getLogger().info(message);
|
|
|
logger.info(message);
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* XML转JSONList
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private List<JSONObject> getListFromXml(String xml) throws Exception
|
|
|
{
|
|
|
private List<JSONObject> getListFromXml(String xml) throws Exception {
|
|
|
SAXReader reader = new SAXReader();
|
|
|
Document doc = reader.read(new ByteArrayInputStream(xml.getBytes("UTF-8")));
|
|
|
Element root = doc.getRootElement();
|
|
@ -684,15 +738,13 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
//xml数据列表
|
|
|
Iterator iter = root.elementIterator("Data");
|
|
|
while (iter.hasNext())
|
|
|
{
|
|
|
while (iter.hasNext()) {
|
|
|
JSONObject obj = new JSONObject();
|
|
|
Element el =(Element)iter.next();
|
|
|
Element el = (Element) iter.next();
|
|
|
Iterator cols = el.elementIterator();
|
|
|
while (cols.hasNext())
|
|
|
{
|
|
|
Element col =(Element)cols.next();
|
|
|
obj.put(col.getName().toUpperCase(),col.getStringValue());
|
|
|
while (cols.hasNext()) {
|
|
|
Element col = (Element) cols.next();
|
|
|
obj.put(col.getName().toUpperCase(), col.getStringValue());
|
|
|
}
|
|
|
re.add(obj);
|
|
|
}
|
|
@ -702,17 +754,17 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
/**
|
|
|
* webservice采集
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String collectWebservice(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
|
|
|
{
|
|
|
private String collectWebservice(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
|
|
|
String message = "";
|
|
|
String datasetId = ds.getJobDatasetId();
|
|
|
String jobDatasetName = ds.getJobDatasetName();
|
|
|
String condition=ds.getJobDatasetCondition();
|
|
|
String key=ds.getJobDatasetKey();
|
|
|
String keytype=ds.getJobDatasetKeytype();
|
|
|
String keyvalue=ds.getJobDatasetKeyvalue();
|
|
|
String condition = ds.getJobDatasetCondition();
|
|
|
String key = ds.getJobDatasetKey();
|
|
|
String keytype = ds.getJobDatasetKeytype();
|
|
|
String keyvalue = ds.getJobDatasetKeyvalue();
|
|
|
String orgCode = ds.getOrgCode();
|
|
|
|
|
|
String datasourceId = ds.getDatasourceId();
|
|
@ -721,8 +773,8 @@ public class DatacollectService implements IDatacollectService {
|
|
|
|
|
|
//webservice地址
|
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
|
Map<String,String> mapConfig = objectMapper.readValue(config,Map.class);
|
|
|
if(mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) {
|
|
|
Map<String, String> mapConfig = objectMapper.readValue(config, Map.class);
|
|
|
if (mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) {
|
|
|
String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
|
|
|
//获取数据集映射
|
|
|
List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
|
|
@ -783,22 +835,21 @@ public class DatacollectService implements IDatacollectService {
|
|
|
}
|
|
|
strSql += strWhere;
|
|
|
//总条数和最大值查询
|
|
|
String sqlCount = "select count(1) as COUNT from (" + strSql+")";
|
|
|
String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
|
|
|
String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
|
|
|
|
|
|
//webservice获取数据总条数
|
|
|
String strCount = WebserviceUtil.request(url,"ExcuteSQL",new Object[]{"",sqlCount});
|
|
|
String strCount = WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlCount});
|
|
|
List<JSONObject> dataCount = getListFromXml(strCount);
|
|
|
|
|
|
if (dataCount!=null &&dataCount.size()>0) {
|
|
|
if (dataCount != null && dataCount.size() > 0) {
|
|
|
Integer count = Integer.parseInt(dataCount.get(0).getString("COUNT"));
|
|
|
if (count == 0) //0条记录,无需采集
|
|
|
{
|
|
|
message = "0条记录,无需采集。";
|
|
|
}
|
|
|
else {
|
|
|
} else {
|
|
|
//webservice获取最大值
|
|
|
String strMax = WebserviceUtil.request(url,"ExcuteSQL",new Object[]{"",sqlMax});
|
|
|
String strMax = WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlMax});
|
|
|
List<JSONObject> dataMax = getListFromXml(strCount);
|
|
|
int successCount = 0;
|
|
|
String maxKeyvalue = dataMax.get(0).getString("MAX_KEYVALUE");
|
|
@ -806,7 +857,7 @@ public class DatacollectService implements IDatacollectService {
|
|
|
//修改最大值
|
|
|
if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
|
|
|
datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
|
|
|
LogService.getLogger().info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
|
|
|
logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
|
|
|
}
|
|
|
int countPage = 1;
|
|
|
if (count > maxNum) //分页采集
|
|
@ -849,16 +900,14 @@ public class DatacollectService implements IDatacollectService {
|
|
|
//任务日志细表异常操作
|
|
|
detail.setJobStatus("0");
|
|
|
detail.setJobContent(msg);
|
|
|
LogService.getLogger().info(msg); //文本日志
|
|
|
logger.info(msg); //文本日志
|
|
|
} else {
|
|
|
detail.setJobStatus("1");
|
|
|
detail.setJobContent("采集成功!");
|
|
|
successCount += rows;
|
|
|
}
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
msg=ex.getMessage();
|
|
|
} catch (Exception ex) {
|
|
|
msg = ex.getMessage();
|
|
|
}
|
|
|
detail.setEndTime(new Date());
|
|
|
datacollectLogDao.saveEntity(detail);
|
|
@ -873,171 +922,11 @@ public class DatacollectService implements IDatacollectService {
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集映射为空!");
|
|
|
}
|
|
|
}
|
|
|
else{
|
|
|
} else {
|
|
|
throw new Exception("非法webservice路径!");
|
|
|
}
|
|
|
|
|
|
LogService.getLogger().info(message);
|
|
|
logger.info(message);
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 执行任务
|
|
|
*/
|
|
|
@Override
|
|
|
public void executeJob(String jobId) throws Exception{
|
|
|
|
|
|
//获取任务详细信息
|
|
|
RsJobConfig job = datacollect.getJobById(jobId);
|
|
|
RsJobLog log = new RsJobLog();
|
|
|
log.setJobId(jobId);
|
|
|
log.setJobStartTime(new Date());
|
|
|
datacollectLogDao.saveEntity(log);
|
|
|
String logId = log.getId();
|
|
|
LogService.getLogger().info("任务"+jobId+"开始采集,新增日志"+logId+"。");
|
|
|
|
|
|
StringBuilder logStr = new StringBuilder();
|
|
|
int count = 0;
|
|
|
int success = 0;
|
|
|
|
|
|
try {
|
|
|
String schemeVersion = job.getSchemeVersion();
|
|
|
//获取任务相关数据集
|
|
|
List<DtoJobDataset> list = datacollectDao.getDatacollectDataset(jobId);
|
|
|
LogService.getLogger().info("获取任务相关数据集,数量"+list.size()+"。");
|
|
|
|
|
|
if (list != null && list.size() > 0) {
|
|
|
count = list.size();
|
|
|
logStr.append("/*********** 开始采集 *******************/\n");
|
|
|
//遍历数据集
|
|
|
for (DtoJobDataset ds : list) {
|
|
|
try {
|
|
|
String type = ds.getType();
|
|
|
String message = "";
|
|
|
logStr.append(DateConvert.toString(new Date(), dateFormat) + " " + ds.getJobDatasetName());
|
|
|
if (type != null) {
|
|
|
if (type.equals("1")) //Web Service
|
|
|
{
|
|
|
message = collectWebservice(ds, schemeVersion, logId) + "\n";
|
|
|
} else if (type.equals("2"))//文件系统
|
|
|
{
|
|
|
message = "文件系统采集。\n";
|
|
|
} else { //数据库
|
|
|
message = collectTable(ds, schemeVersion, logId) + "\n";
|
|
|
}
|
|
|
} else {
|
|
|
message = ds.getJobDatasetName() + "未关联数据源!\n";
|
|
|
}
|
|
|
|
|
|
|
|
|
LogService.getLogger().info(message); //文本日志
|
|
|
logStr.append(message);
|
|
|
success++;
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
LogService.getLogger().info("异常:" + ex.getMessage());
|
|
|
logStr.append(ex.getMessage() + "\n");
|
|
|
}
|
|
|
}
|
|
|
logStr.append("/*********** 结束采集 *******************/\n");
|
|
|
}
|
|
|
|
|
|
} catch (Exception ex) {
|
|
|
ex.printStackTrace();
|
|
|
LogService.getLogger().info("异常:" + ex.getMessage());
|
|
|
logStr.append(ex.getMessage() + "\n");
|
|
|
logStr.append("/*********** 出现异常,中断采集 *******************/\n");
|
|
|
}
|
|
|
|
|
|
|
|
|
//任务主日志成功
|
|
|
String jobContent = logStr.toString().replace("\"", "\\\"");
|
|
|
if(jobContent.length()>4000)
|
|
|
{
|
|
|
jobContent = jobContent.substring(0,4000);
|
|
|
}
|
|
|
log.setJobContent(jobContent);
|
|
|
log.setJobEndTime(new Date());
|
|
|
log.setJobDatasetCount(count);
|
|
|
log.setJobDatasetSuccess(success);
|
|
|
LogService.getLogger().info("任务结束," + count + "个数据集成功采集" + success + "个。");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据日志详细补采数据
|
|
|
*/
|
|
|
@Override
|
|
|
@Transactional
|
|
|
public ActionResult repeatJob(String id) throws Exception
|
|
|
{
|
|
|
RsJobLogDetail log = datacollectLogDao.getEntity(RsJobLogDetail.class, id);
|
|
|
|
|
|
if(log.getJobStatus().equals("2")) {
|
|
|
return new ActionResult(false,"数据补采中!");
|
|
|
}
|
|
|
if(!log.getJobStatus().equals("0")){
|
|
|
return new ActionResult(false,"数据无需补采!");
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
log.setRepeatStartTime(new Date());
|
|
|
log.setJobStatus("2"); //设置采集中状态
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
}
|
|
|
catch (Exception e){
|
|
|
return new ActionResult(false,"补采失败!");
|
|
|
}
|
|
|
log.setJobStatus("0");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
|
|
|
String stdDatasetCode = log.getStdDatasetCode();
|
|
|
String sql = log.getJobSql();
|
|
|
//数据库连接
|
|
|
String datasourceId = log.getDatasourceId();
|
|
|
String config = log.getConfig();
|
|
|
DBHelper db = new DBHelper(datasourceId,config);
|
|
|
//获取数据集字段映射结构
|
|
|
String schemeVersion = log.getSchemeVersion();
|
|
|
String datasetId = log.getJobDatasetId();
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion,datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
List<JSONObject> list = db.query(sql);
|
|
|
String message = intoMongodb(list,schemeVersion,stdDatasetCode,colList);
|
|
|
if(message.length()>0 || db.errorMessage.length()>0)
|
|
|
{
|
|
|
log.setJobStatus("0");
|
|
|
log.setRepeatEndTime(new Date());
|
|
|
if(message.length()>0)
|
|
|
{
|
|
|
log.setRepeatJobContent(message);
|
|
|
}
|
|
|
else{
|
|
|
db.errorMessage.length();
|
|
|
}
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
return new ActionResult(false,"补采失败!");
|
|
|
}
|
|
|
else{
|
|
|
log.setJobStatus("3");
|
|
|
log.setRepeatEndTime(new Date());
|
|
|
log.setRepeatJobContent("补采成功!");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
return new ActionResult(true,"补采成功!");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public static void main(String[] args) throws Exception{
|
|
|
//namespace是命名空间,methodName是方法名
|
|
|
String sql = "select count(1) as COUNT,max(to_number(HDSD03_01_031)) as MAX_KEYVALUE from HDSC01_02 where 1=1 order by to_number(HDSD03_01_031)";
|
|
|
//调用web Service//输出调用结果
|
|
|
System.out.println(WebserviceUtil.request("http://172.19.103.71:8080/service/sql?wsdl", "ExcuteSQL", new Object[]{"", sql}));
|
|
|
|
|
|
}
|
|
|
}
|