|
@ -1,1207 +0,0 @@
|
|
|
package com.yihu.hos.datacollect.service;
|
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.yihu.ehr.dbhelper.common.QueryCondition;
|
|
|
import com.yihu.ehr.dbhelper.common.enums.DBType;
|
|
|
import com.yihu.ehr.dbhelper.common.sqlparser.ParserMysql;
|
|
|
import com.yihu.ehr.dbhelper.common.sqlparser.ParserOracle;
|
|
|
import com.yihu.ehr.dbhelper.common.sqlparser.ParserSql;
|
|
|
import com.yihu.ehr.dbhelper.common.sqlparser.ParserSqlserver;
|
|
|
import com.yihu.ehr.dbhelper.jdbc.DBHelper;
|
|
|
import com.yihu.ehr.dbhelper.mongodb.MongodbHelper;
|
|
|
import com.yihu.hos.common.Services;
|
|
|
import com.yihu.hos.core.log.Logger;
|
|
|
import com.yihu.hos.core.log.LoggerFactory;
|
|
|
import com.yihu.hos.crawler.model.config.SysConfig;
|
|
|
import com.yihu.hos.crawler.model.patient.PatientIdentity;
|
|
|
import com.yihu.hos.datacollect.dao.DatacollectDao;
|
|
|
import com.yihu.hos.datacollect.dao.DatacollectLogDao;
|
|
|
import com.yihu.hos.datacollect.model.*;
|
|
|
import com.yihu.hos.resource.service.StdService;
|
|
|
import com.yihu.hos.web.framework.constant.DateConvert;
|
|
|
import com.yihu.hos.web.framework.constant.SqlConstants;
|
|
|
import com.yihu.hos.web.framework.model.ActionResult;
|
|
|
import com.yihu.hos.web.framework.util.GridFSUtil;
|
|
|
import org.bson.types.ObjectId;
|
|
|
import org.dom4j.Document;
|
|
|
import org.dom4j.Element;
|
|
|
import org.dom4j.io.SAXReader;
|
|
|
import org.json.JSONArray;
|
|
|
import org.json.JSONObject;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
import java.sql.Blob;
|
|
|
import java.text.ParseException;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* 数据采集执行服务
|
|
|
*/
|
|
|
@Service(Services.DatacollectService)
|
|
|
public class DatacollectService {
|
|
|
static private final Logger logger = LoggerFactory.getLogger(DatacollectService.class);
|
|
|
public static final String BEAN_ID = Services.DatacollectService;
|
|
|
|
|
|
MongodbHelper mongoOrigin = new MongodbHelper("origin");
|
|
|
MongodbHelper mongo = new MongodbHelper();
|
|
|
String dateFormat = "yyyy-MM-dd HH:mm:ss"; //默认时间字符串格式
|
|
|
int maxNum = 1000; //查询条数限制
|
|
|
@Resource(name = Services.Datacollect)
|
|
|
private DatacollectManager datacollect;
|
|
|
@Resource(name = StdService.BEAN_ID)
|
|
|
private StdService stdService;
|
|
|
@Resource(name = DatacollectDao.BEAN_ID)
|
|
|
private DatacollectDao datacollectDao;
|
|
|
@Resource(name = DatacollectLogDao.BEAN_ID)
|
|
|
private DatacollectLogDao datacollectLogDao;
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
|
|
|
/**
|
|
|
* 根据连接字符串获取数据库类型
|
|
|
*/
|
|
|
private static DBType getDbType(String uri) {
|
|
|
return uri.startsWith("jdbc:mysql") ? DBType.Mysql : (uri.startsWith("jdbc:oracle") ? DBType.Oracle : (uri.startsWith("jdbc:hive2") ? DBType.Hive : (uri.startsWith("jdbc:microsoft:sqlserver") ? DBType.Sqlserver : DBType.Mysql)));
|
|
|
}
|
|
|
|
|
|
// public static void main(String[] args) throws Exception {
|
|
|
// //namespace是命名空间,methodName是方法名
|
|
|
// String sql = "select count(1) as COUNT,max(to_number(HDSD03_01_031)) as MAX_KEYVALUE from HDSC01_02 where 1=1 order by to_number(HDSD03_01_031)";
|
|
|
// //调用web Service//输出调用结果
|
|
|
// System.out.println(WebserviceUtil.request("http://172.19.103.71:8080/service/sql?wsdl", "ExcuteSQL", new Object[]{"", sql}));
|
|
|
//
|
|
|
// }
|
|
|
|
|
|
/**
|
|
|
* 执行任务
|
|
|
*/
|
|
|
public void executeJob(String jobId) throws Exception {
|
|
|
|
|
|
//获取任务详细信息
|
|
|
RsJobConfig job = datacollect.getJobById(jobId);
|
|
|
RsJobLog log = new RsJobLog();
|
|
|
log.setJobId(jobId);
|
|
|
log.setJobStartTime(new Date());
|
|
|
datacollectLogDao.saveEntity(log);
|
|
|
String logId = log.getId();
|
|
|
logger.info("任务" + jobId + "开始采集,新增日志" + logId + "。");
|
|
|
|
|
|
StringBuilder logStr = new StringBuilder();
|
|
|
int count = 0;
|
|
|
int success = 0;
|
|
|
|
|
|
try {
|
|
|
String schemeVersion = job.getSchemeVersion();
|
|
|
//获取任务相关数据集
|
|
|
List<DtoJobDataset> list = datacollectDao.getDatacollectDataset(jobId);
|
|
|
logger.info("获取任务相关数据集,数量" + list.size() + "。");
|
|
|
|
|
|
if (list != null && list.size() > 0) {
|
|
|
count = list.size();
|
|
|
logStr.append("/*********** 开始采集 *******************/\n");
|
|
|
//遍历数据集
|
|
|
for (DtoJobDataset ds : list) {
|
|
|
try {
|
|
|
String type = ds.getType();
|
|
|
String message = "";
|
|
|
logStr.append(DateConvert.toString(new Date(), dateFormat) + " " + ds.getJobDatasetName());
|
|
|
if (type != null) {
|
|
|
if (type.equals("1")) //Web Service
|
|
|
{
|
|
|
message = collectWebservice(ds, schemeVersion, logId) + "\n";
|
|
|
} else if (type.equals("2"))//文件系统
|
|
|
{
|
|
|
message = "文件系统采集。\n";
|
|
|
} else { //数据库
|
|
|
message = collectTable(ds, schemeVersion, logId) + "\n";
|
|
|
}
|
|
|
} else {
|
|
|
message = ds.getJobDatasetName() + "未关联数据源!\n";
|
|
|
}
|
|
|
|
|
|
|
|
|
logger.info(message); //文本日志
|
|
|
logStr.append(message);
|
|
|
success++;
|
|
|
} catch (Exception ex) {
|
|
|
logger.info("异常:" + ex.getMessage());
|
|
|
logStr.append(ex.getMessage() + "\n");
|
|
|
}
|
|
|
}
|
|
|
logStr.append("/*********** 结束采集 *******************/\n");
|
|
|
}
|
|
|
|
|
|
} catch (Exception ex) {
|
|
|
ex.printStackTrace();
|
|
|
logger.info("异常:" + ex.getMessage());
|
|
|
logStr.append(ex.getMessage() + "\n");
|
|
|
logStr.append("/*********** 出现异常,中断采集 *******************/\n");
|
|
|
}
|
|
|
|
|
|
|
|
|
//任务主日志成功
|
|
|
String jobContent = logStr.toString().replace("\"", "\\\"");
|
|
|
if (jobContent.length() > 4000) {
|
|
|
jobContent = jobContent.substring(0, 4000);
|
|
|
}
|
|
|
log.setJobContent(jobContent);
|
|
|
log.setJobEndTime(new Date());
|
|
|
log.setJobDatasetCount(count);
|
|
|
log.setJobDatasetSuccess(success);
|
|
|
logger.info("任务结束," + count + "个数据集成功采集" + success + "个。");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据日志详细补采数据
|
|
|
*/
|
|
|
@Transactional
|
|
|
public ActionResult repeatJob(String id) throws Exception {
|
|
|
RsJobLogDetail log = datacollectLogDao.getEntity(RsJobLogDetail.class, id);
|
|
|
|
|
|
if (log.getJobStatus().equals("2")) {
|
|
|
return new ActionResult(false, "数据补采中!");
|
|
|
}
|
|
|
if (!log.getJobStatus().equals("0")) {
|
|
|
return new ActionResult(false, "数据无需补采!");
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
log.setRepeatStartTime(new Date());
|
|
|
log.setJobStatus("2"); //设置采集中状态
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
} catch (Exception e) {
|
|
|
return new ActionResult(false, "补采失败!");
|
|
|
}
|
|
|
log.setJobStatus("0");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
|
|
|
String stdDatasetCode = log.getStdDatasetCode();
|
|
|
String sql = log.getJobSql();
|
|
|
//数据库连接
|
|
|
String datasourceId = log.getDatasourceId();
|
|
|
String config = log.getConfig();
|
|
|
DBHelper db = new DBHelper(datasourceId, config);
|
|
|
//获取数据集字段映射结构
|
|
|
String schemeVersion = log.getSchemeVersion();
|
|
|
String datasetId = log.getJobDatasetId();
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
List<JSONObject> list = db.query(sql);
|
|
|
String message = intoMongodb(list, schemeVersion, stdDatasetCode, colList);
|
|
|
if (message.length() > 0 || db.errorMessage.length() > 0) {
|
|
|
log.setJobStatus("0");
|
|
|
log.setRepeatEndTime(new Date());
|
|
|
if (message.length() > 0) {
|
|
|
log.setRepeatJobContent(message);
|
|
|
} else {
|
|
|
db.errorMessage.length();
|
|
|
}
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
return new ActionResult(false, "补采失败!");
|
|
|
} else {
|
|
|
log.setJobStatus("3");
|
|
|
log.setRepeatEndTime(new Date());
|
|
|
log.setRepeatJobContent("补采成功!");
|
|
|
datacollectLogDao.updateEntity(log);
|
|
|
return new ActionResult(true, "补采成功!");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据数据库类型获取时间sql
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String getDateSqlByDBType(DBType dbType, Date date) throws Exception {
|
|
|
String val = DateConvert.toString(date, dateFormat);
|
|
|
if (dbType.equals(DBType.Mysql)) {
|
|
|
return "date_format(\'" + val + "\',\'" + dateFormat + "\')";
|
|
|
} else if (dbType.equals(DBType.Oracle)) {
|
|
|
return "to_date(\'" + val + "\',\'" + dateFormat + "\')";
|
|
|
} else {
|
|
|
return val;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据数据库类型获取转换数值型sql
|
|
|
*/
|
|
|
private String getToNumberSqlByDBType(DBType dbType, String key) throws Exception {
|
|
|
if (dbType.equals(DBType.Mysql)) {
|
|
|
return "cast(" + key + " as signed integer)";
|
|
|
} else if (dbType.equals(DBType.Oracle)) {
|
|
|
return "to_number(" + key + ")";
|
|
|
} else {
|
|
|
return key;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 根据数据库类型获取分页sql
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String getPageSqlByDBType(DBType dbType, String sql, int start, int rows) throws Exception {
|
|
|
if (dbType.equals(DBType.Mysql)) {
|
|
|
return sql + " LIMIT " + start + "," + rows;
|
|
|
} else if (dbType.equals(DBType.Oracle)) {
|
|
|
return " select * from (select t.*,ROWNUM RSCOM_RN from (" + sql + ") t where ROWNUM<" + (start + rows + 1) + ") where RSCOM_RN>= " + (start + 1);
|
|
|
} else {
|
|
|
return sql;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 字典全转换成中文
|
|
|
*/
|
|
|
private List<JSONObject> translateDictCN(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
|
|
|
//获取字典列表
|
|
|
List<DtoDictCol> dictColList = new ArrayList<>();
|
|
|
|
|
|
for (int i = 0; i < colList.length(); i++) {
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String dictId = col.optString("adapterDictId");
|
|
|
if (dictId != null && dictId.length() > 0) {
|
|
|
String dictType = col.optString("adapterDataType");
|
|
|
String stdMetadataCode = col.optString("stdMetadataCode");
|
|
|
DtoDictCol dictCol = new DtoDictCol();
|
|
|
dictCol.setStdMetadataCode(stdMetadataCode);
|
|
|
dictCol.setStdDictId(dictId);
|
|
|
dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
|
|
|
//获取字典数据
|
|
|
List dictString = stdService.getDictByScheme(schemeVersion, dictId);
|
|
|
JSONArray dictAdapterArray = new JSONArray(dictString);
|
|
|
|
|
|
dictCol.setDictList(dictAdapterArray);
|
|
|
dictColList.add(dictCol);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//翻译列表
|
|
|
for (JSONObject data : list) {
|
|
|
//遍历字典字段
|
|
|
for (DtoDictCol col : dictColList) {
|
|
|
String colNmae = col.getStdMetadataCode();
|
|
|
String oldValue = data.optString(colNmae);
|
|
|
String newValue = translateDictValueCN(oldValue, col.getAdapterDataType(), col.getDictList());
|
|
|
|
|
|
if (newValue != null && newValue.length() > 0) {
|
|
|
data.put(colNmae, newValue);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return list;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 转译字典成中文
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String translateDictValueCN(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
|
|
|
if (type.equals("0")) //原本就是值
|
|
|
{
|
|
|
return oldValue;
|
|
|
}
|
|
|
|
|
|
|
|
|
//遍历字典数据(编码->名称)
|
|
|
for (int i = 0; i < dictAdapterList.length(); i++) {
|
|
|
JSONObject dictItem = dictAdapterList.getJSONObject(i);
|
|
|
if (oldValue != null && dictItem.has("stdEntryCode")) {
|
|
|
if (oldValue.equals(dictItem.getString("stdEntryCode"))) {
|
|
|
String newValue = dictItem.getString("stdEntryValue"); //名称
|
|
|
return newValue;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return oldValue;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 字典转换
|
|
|
*
|
|
|
* @param list
|
|
|
* @param colList
|
|
|
* @return
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
private List<JSONObject> translateDict(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
|
|
|
//获取字典列表
|
|
|
List<DtoDictCol> dictColList = new ArrayList<>();
|
|
|
|
|
|
for (int i = 0; i < colList.length(); i++) {
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String dictId = col.optString("adapterDictId");
|
|
|
if (dictId != null && dictId.length() > 0) {
|
|
|
String dictType = col.optString("adapterDataType");
|
|
|
String stdMetadataCode = col.optString("stdMetadataCode");
|
|
|
DtoDictCol dictCol = new DtoDictCol();
|
|
|
dictCol.setStdMetadataCode(stdMetadataCode);
|
|
|
dictCol.setStdDictId(dictId);
|
|
|
dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
|
|
|
//获取字典数据
|
|
|
List dictString = stdService.getDictByScheme(schemeVersion, dictId);
|
|
|
JSONArray dictAdapterArray = new JSONArray(dictString);
|
|
|
|
|
|
dictCol.setDictList(dictAdapterArray);
|
|
|
dictColList.add(dictCol);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//翻译列表
|
|
|
for (JSONObject data : list) {
|
|
|
//遍历字典字段
|
|
|
for (DtoDictCol col : dictColList) {
|
|
|
String colNmae = col.getStdMetadataCode();
|
|
|
String oldValue = data.optString(colNmae);
|
|
|
String newValue = translateDictValue(oldValue, col.getAdapterDataType(), col.getDictList());
|
|
|
|
|
|
if (newValue != null && newValue.length() > 0) {
|
|
|
data.put(colNmae, newValue);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return list;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 转译字典
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String translateDictValue(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
|
|
|
//应用标准字段
|
|
|
String colName = "adapterEntryCode";
|
|
|
if (type.equals("0")) //通过name转译
|
|
|
{
|
|
|
colName = "adapterEntryValue";
|
|
|
}
|
|
|
|
|
|
//遍历字典数据
|
|
|
for (int i = 0; i < dictAdapterList.length(); i++) {
|
|
|
JSONObject dictItem = dictAdapterList.getJSONObject(i);
|
|
|
if (oldValue != null && dictItem.has(colName)) {
|
|
|
if (oldValue.equals(dictItem.getString(colName))) {
|
|
|
String newValue = dictItem.getString("stdEntryCode");
|
|
|
return newValue;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//找不到适配字典数据则返回空
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取过滤条件
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String getCondition(DBType dbType, String conditionString) {
|
|
|
JSONArray array = new JSONArray(conditionString);
|
|
|
|
|
|
if (array != null && array.length() > 0) {
|
|
|
List<QueryCondition> conditions = new ArrayList<>();
|
|
|
for (Object item : array) {
|
|
|
JSONObject obj = (JSONObject) item;
|
|
|
String logical = obj.getString("andOr");
|
|
|
String operation = obj.getString("condition");
|
|
|
String field = obj.getString("field");
|
|
|
String keyword = obj.getString("value");
|
|
|
conditions.add(new QueryCondition(logical, operation, field, keyword));
|
|
|
}
|
|
|
//条件语句转换
|
|
|
ParserSql ps;
|
|
|
switch (dbType) {
|
|
|
case Oracle:
|
|
|
ps = new ParserOracle();
|
|
|
break;
|
|
|
case Sqlserver:
|
|
|
ps = new ParserSqlserver();
|
|
|
break;
|
|
|
default:
|
|
|
ps = new ParserMysql();
|
|
|
}
|
|
|
return ps.getConditionSql(conditions);
|
|
|
}
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取条件SQL
|
|
|
*
|
|
|
* @param dbType
|
|
|
* @param conditionString
|
|
|
* @return
|
|
|
* @throws ParseException
|
|
|
*/
|
|
|
private String getConditionSql(DBType dbType, String conditionString) throws ParseException {
|
|
|
String conditionSql = "";
|
|
|
JSONArray conditions = new JSONArray(conditionString);
|
|
|
Iterator iterator = conditions.iterator();
|
|
|
|
|
|
while (iterator.hasNext()) {
|
|
|
JSONObject condition = (JSONObject) iterator.next();
|
|
|
String logic = condition.getString("condition");
|
|
|
String andOr = condition.getString("andOr");
|
|
|
String field = condition.getString("field");
|
|
|
String value = condition.getString("value");
|
|
|
String fieldType = condition.getString("type");
|
|
|
String keys = "";
|
|
|
|
|
|
if (andOr.equals(" AND ")) {
|
|
|
conditionSql = conditionSql + " and ";
|
|
|
} else {
|
|
|
conditionSql = conditionSql + " or ";
|
|
|
}
|
|
|
|
|
|
if (logic.equals(" IN ") || logic.equals(" NOT IN ")) {
|
|
|
String[] keywords = value.split(",");
|
|
|
|
|
|
for (String key : keywords) {
|
|
|
keys += "'" + key + "',";
|
|
|
}
|
|
|
|
|
|
keys = " (" + keys.substring(0, keys.length() - 1) + ") ";
|
|
|
} else if (logic.equals(" LIKE ")) {
|
|
|
keys += " '%" + value + "%' ";
|
|
|
} else {
|
|
|
if (fieldType.equals("DATE")) {
|
|
|
keys += getDateFormatSql(dbType, value);
|
|
|
} else {
|
|
|
keys += " '" + value + "' ";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
conditionSql += field + logic + keys;
|
|
|
}
|
|
|
|
|
|
return conditionSql;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取对应数据库时间格式
|
|
|
*
|
|
|
* @param dbType
|
|
|
* @param key
|
|
|
* @return
|
|
|
* @throws ParseException
|
|
|
*/
|
|
|
private String getDateFormatSql(DBType dbType, String key) throws ParseException {
|
|
|
String dateFormat = "yyyy-MM-dd HH:mm:ss";
|
|
|
SimpleDateFormat formatDate = new SimpleDateFormat("yyyy-MM-dd");
|
|
|
Date d = formatDate.parse(key);
|
|
|
SimpleDateFormat format = new SimpleDateFormat(dateFormat);
|
|
|
|
|
|
switch (dbType) {
|
|
|
case Oracle:
|
|
|
key = "to_date(\'" + format.format(d) + "\',\'YYYY-MM-DD HH24:MI:SS\')";
|
|
|
break;
|
|
|
case Sqlserver:
|
|
|
|
|
|
break;
|
|
|
default:
|
|
|
key = "date_format(\'" + format.format(d) + "\',\'%y-%m-%d %T\')";
|
|
|
}
|
|
|
|
|
|
return key;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 采集入库
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String intoMongodb(List<JSONObject> list, String schemeVersion, String stdDatasetCode, JSONArray colList) {
|
|
|
String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
|
|
|
String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
|
|
|
PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
|
|
|
if (patientIdentity != null) {
|
|
|
patientIdCode = patientIdentity.getPatientIDCode();
|
|
|
eventNoCode = patientIdentity.getEventNoCode();
|
|
|
}
|
|
|
try {
|
|
|
if (!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
|
|
|
return "Mongodb索引创建失败!(表:" + stdDatasetCode + ")";
|
|
|
}
|
|
|
if (list != null && list.size() > 0) {
|
|
|
//字典未转换前采集到原始库
|
|
|
boolean b = mongoOrigin.insert(stdDatasetCode, translateDictCN(list, colList, schemeVersion));
|
|
|
|
|
|
//字典转换
|
|
|
list = translateDict(list, colList, schemeVersion);
|
|
|
|
|
|
//采集到mongodb
|
|
|
b = mongo.insert(stdDatasetCode, list);
|
|
|
if (!b) {
|
|
|
if (mongo.errorMessage != null && mongo.errorMessage.length() > 0) {
|
|
|
logger.debug(mongo.errorMessage);
|
|
|
return mongo.errorMessage;
|
|
|
} else {
|
|
|
return "Mongodb保存失败!(表:" + stdDatasetCode + ")";
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
return e.getMessage();
|
|
|
}
|
|
|
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 数据库表采集
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String collectTable(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
|
|
|
String message = "";
|
|
|
String datasetId = ds.getJobDatasetId();
|
|
|
String jobDatasetName = ds.getJobDatasetName();
|
|
|
String condition = ds.getJobDatasetCondition();
|
|
|
String key = ds.getJobDatasetKey();
|
|
|
String keytype = ds.getJobDatasetKeytype();
|
|
|
String keyvalue = ds.getJobDatasetKeyvalue();
|
|
|
String orgCode = ds.getOrgCode();
|
|
|
|
|
|
String datasourceId = ds.getDatasourceId();
|
|
|
String config = ds.getConfig(); //数据库连接
|
|
|
DBHelper db = new DBHelper(datasourceId, config);
|
|
|
DBType dbType = db.dbType;
|
|
|
|
|
|
//获取数据集映射
|
|
|
List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
|
|
|
JSONArray datasetList = new JSONArray(datasetString);
|
|
|
|
|
|
if (datasetList != null && datasetList.length() > 0) {
|
|
|
String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
|
|
|
String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
|
|
|
//获取数据集字段映射结构
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
|
|
|
if (colList != null && colList.length() > 0) {
|
|
|
//拼接查询sql
|
|
|
String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
|
|
|
for (int i = 0; i < colList.length(); i++) {
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String adapterMetadataCode = col.optString("adapterMetadataCode");
|
|
|
if (adapterMetadataCode.length() > 0) {
|
|
|
strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
|
|
|
}
|
|
|
}
|
|
|
strSql += " from " + adapterTableName;
|
|
|
String strWhere = " where 1=1";
|
|
|
//采集范围
|
|
|
if (condition != null && condition.length() > 0) {
|
|
|
strWhere += getConditionSql(dbType, condition);
|
|
|
}
|
|
|
//增量采集
|
|
|
String maxKey = "0";
|
|
|
if (key != null && key.length() > 0) {
|
|
|
maxKey = key;
|
|
|
|
|
|
if (keytype.toUpperCase().equals("DATE")) //时间类型
|
|
|
{
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
Date keyDate = new Date();
|
|
|
//字符串转时间
|
|
|
keyDate = DateConvert.toDate(keyvalue);
|
|
|
//根据数据库类型获取时间sql
|
|
|
strWhere += " and " + maxKey + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
|
|
|
}
|
|
|
|
|
|
} else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
|
|
|
{
|
|
|
maxKey = getToNumberSqlByDBType(dbType, key);
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
strWhere += " and " + maxKey + ">'" + keyvalue + "'";
|
|
|
}
|
|
|
} else {
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
strWhere += " and " + maxKey + ">'" + keyvalue + "'";
|
|
|
}
|
|
|
}
|
|
|
strWhere += " order by " + maxKey;
|
|
|
}
|
|
|
|
|
|
strSql += strWhere;
|
|
|
//总条数
|
|
|
String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
|
|
|
String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
|
|
|
JSONObject objCount = db.load(sqlCount);
|
|
|
if (objCount == null) {
|
|
|
if (db.errorMessage.length() > 0) {
|
|
|
throw new Exception(db.errorMessage);
|
|
|
} else {
|
|
|
throw new Exception("查询异常:" + sqlCount);
|
|
|
}
|
|
|
} else {
|
|
|
int count = objCount.getInt("COUNT");
|
|
|
|
|
|
if (count == 0) //0条记录,无需采集
|
|
|
{
|
|
|
message = "0条记录,无需采集。";
|
|
|
} else {
|
|
|
//获取最大值
|
|
|
JSONObject objMax = db.load(sqlMax);
|
|
|
|
|
|
int successCount = 0;
|
|
|
String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
|
|
|
//修改最大值
|
|
|
if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
|
|
|
datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
|
|
|
logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
|
|
|
}
|
|
|
int countPage = 1;
|
|
|
if (count > maxNum) //分页采集
|
|
|
{
|
|
|
countPage = count / maxNum + 1;
|
|
|
}
|
|
|
for (int i = 0; i < countPage; i++) {
|
|
|
int rows = maxNum;
|
|
|
if (i + 1 == countPage) {
|
|
|
rows = count - i * maxNum;
|
|
|
}
|
|
|
String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
|
|
|
|
|
|
RsJobLogDetail detail = new RsJobLogDetail();
|
|
|
detail.setStartTime(new Date());
|
|
|
detail.setJobLogId(logId);
|
|
|
detail.setDatasourceId(datasourceId);
|
|
|
detail.setConfig(config);
|
|
|
detail.setStdDatasetCode(stdTableName);
|
|
|
detail.setJobDatasetId(datasetId);
|
|
|
detail.setJobDatasetName(ds.getJobDatasetName());
|
|
|
detail.setJobId(ds.getJobId());
|
|
|
detail.setJobSql(sql);
|
|
|
detail.setJobNum(i + 1);
|
|
|
detail.setJobDatasetRows(rows);
|
|
|
detail.setSchemeVersion(schemeVersion);
|
|
|
|
|
|
List<JSONObject> list = db.query(sql);
|
|
|
String msg = "";
|
|
|
if (list != null) {
|
|
|
msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
|
|
|
} else {
|
|
|
if (db.errorMessage.length() > 0) {
|
|
|
msg = db.errorMessage;
|
|
|
} else {
|
|
|
msg = "查询数据为空!";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (msg.length() > 0) {
|
|
|
//任务日志细表异常操作
|
|
|
detail.setJobStatus("0");
|
|
|
detail.setJobContent(msg);
|
|
|
logger.info(msg); //文本日志
|
|
|
} else {
|
|
|
detail.setJobStatus("1");
|
|
|
detail.setJobContent("采集成功!");
|
|
|
successCount += rows;
|
|
|
}
|
|
|
detail.setEndTime(new Date());
|
|
|
datacollectLogDao.saveEntity(detail);
|
|
|
}
|
|
|
|
|
|
message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集字段映射为空!");
|
|
|
}
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集映射为空!");
|
|
|
}
|
|
|
|
|
|
|
|
|
logger.info(message);
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* XML转JSONList
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private List<JSONObject> getListFromXml(String xml) throws Exception {
|
|
|
SAXReader reader = new SAXReader();
|
|
|
Document doc = reader.read(new ByteArrayInputStream(xml.getBytes("UTF-8")));
|
|
|
Element root = doc.getRootElement();
|
|
|
List<JSONObject> re = new ArrayList<>();
|
|
|
|
|
|
//xml数据列表
|
|
|
Iterator iter = root.elementIterator("Data");
|
|
|
while (iter.hasNext()) {
|
|
|
JSONObject obj = new JSONObject();
|
|
|
Element el = (Element) iter.next();
|
|
|
Iterator cols = el.elementIterator();
|
|
|
while (cols.hasNext()) {
|
|
|
Element col = (Element) cols.next();
|
|
|
obj.put(col.getName().toUpperCase(), col.getStringValue());
|
|
|
}
|
|
|
re.add(obj);
|
|
|
}
|
|
|
|
|
|
return re;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* webservice采集
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
private String collectWebservice(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
|
|
|
String message = "";
|
|
|
String datasetId = ds.getJobDatasetId();
|
|
|
String jobDatasetName = ds.getJobDatasetName();
|
|
|
String condition = ds.getJobDatasetCondition();
|
|
|
String key = ds.getJobDatasetKey();
|
|
|
String keytype = ds.getJobDatasetKeytype();
|
|
|
String keyvalue = ds.getJobDatasetKeyvalue();
|
|
|
String orgCode = ds.getOrgCode();
|
|
|
|
|
|
String datasourceId = ds.getDatasourceId();
|
|
|
String config = ds.getConfig(); //数据库连接
|
|
|
DBType dbType = DBType.Oracle;//********** 先定死Oracle ****************************
|
|
|
|
|
|
//webservice地址
|
|
|
Map<String, String> mapConfig = objectMapper.readValue(config, Map.class);
|
|
|
if (mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) {
|
|
|
String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
|
|
|
//获取数据集映射
|
|
|
List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
|
|
|
JSONArray datasetList = new JSONArray(datasetString);
|
|
|
|
|
|
if (datasetList != null && datasetList.length() > 0) {
|
|
|
String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
|
|
|
String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
|
|
|
//获取数据集字段映射结构
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
|
|
|
if (colList != null && colList.length() > 0) {
|
|
|
//拼接查询sql
|
|
|
String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
|
|
|
for (int i = 0; i < colList.length(); i++) {
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String adapterMetadataCode = col.optString("adapterMetadataCode");
|
|
|
if (adapterMetadataCode.length() > 0) {
|
|
|
strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
|
|
|
}
|
|
|
}
|
|
|
strSql += " from " + adapterTableName;
|
|
|
String strWhere = " where 1=1";
|
|
|
//采集范围
|
|
|
if (condition != null && condition.length() > 0) {
|
|
|
strWhere += getConditionSql(dbType, condition);
|
|
|
}
|
|
|
//增量采集
|
|
|
String maxKey = "0";
|
|
|
String keyValue = ds.getJobDatasetKeyvalue();
|
|
|
if (key != null && key.length() > 0) {
|
|
|
maxKey = key;
|
|
|
if (keytype.toUpperCase().equals("DATE")) //时间类型
|
|
|
{
|
|
|
Date keyDate = new Date();
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
//字符串转时间
|
|
|
keyDate = DateConvert.toDate(keyvalue);
|
|
|
//根据数据库类型获取时间sql
|
|
|
strWhere += " and " + key + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
|
|
|
strWhere += " order by " + key;
|
|
|
}
|
|
|
|
|
|
} else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
|
|
|
{
|
|
|
maxKey = getToNumberSqlByDBType(dbType, key);
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
strWhere += " and " + maxKey + ">'" + keyvalue + "'";
|
|
|
strWhere += " order by " + maxKey;
|
|
|
}
|
|
|
} else {
|
|
|
if (keyvalue != null && keyvalue.length() > 0) {
|
|
|
strWhere += " and " + key + ">'" + keyvalue + "'";
|
|
|
strWhere += " order by " + key;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
strSql += strWhere;
|
|
|
//总条数和最大值查询
|
|
|
String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
|
|
|
String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
|
|
|
|
|
|
//webservice获取数据总条数
|
|
|
String strCount = "";//WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlCount});
|
|
|
List<JSONObject> dataCount = getListFromXml(strCount);
|
|
|
|
|
|
if (dataCount != null && dataCount.size() > 0) {
|
|
|
Integer count = Integer.parseInt(dataCount.get(0).getString("COUNT"));
|
|
|
if (count == 0) //0条记录,无需采集
|
|
|
{
|
|
|
message = "0条记录,无需采集。";
|
|
|
} else {
|
|
|
//webservice获取最大值
|
|
|
String strMax = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlMax});
|
|
|
List<JSONObject> dataMax = getListFromXml(strCount);
|
|
|
int successCount = 0;
|
|
|
String maxKeyvalue = dataMax.get(0).getString("MAX_KEYVALUE");
|
|
|
|
|
|
//修改最大值
|
|
|
if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
|
|
|
datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
|
|
|
logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
|
|
|
}
|
|
|
int countPage = 1;
|
|
|
if (count > maxNum) //分页采集
|
|
|
{
|
|
|
countPage = count / maxNum + 1;
|
|
|
}
|
|
|
for (int i = 0; i < countPage; i++) {
|
|
|
int rows = maxNum;
|
|
|
if (i + 1 == countPage) {
|
|
|
rows = count - i * maxNum;
|
|
|
}
|
|
|
String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
|
|
|
|
|
|
RsJobLogDetail detail = new RsJobLogDetail();
|
|
|
detail.setStartTime(new Date());
|
|
|
detail.setJobLogId(logId);
|
|
|
detail.setDatasourceId(datasourceId);
|
|
|
detail.setConfig(config);
|
|
|
detail.setStdDatasetCode(stdTableName);
|
|
|
detail.setJobDatasetId(datasetId);
|
|
|
detail.setJobDatasetName(ds.getJobDatasetName());
|
|
|
detail.setJobId(ds.getJobId());
|
|
|
detail.setJobSql(sql);
|
|
|
detail.setJobNum(i + 1);
|
|
|
detail.setJobDatasetRows(rows);
|
|
|
detail.setSchemeVersion(schemeVersion);
|
|
|
|
|
|
String msg = "";
|
|
|
try {
|
|
|
//获取分页数据
|
|
|
String strList = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sql});
|
|
|
List<JSONObject> list = getListFromXml(strList);
|
|
|
if (list != null) {
|
|
|
msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
|
|
|
} else {
|
|
|
msg = "查询数据为空!";
|
|
|
}
|
|
|
|
|
|
if (msg.length() > 0) {
|
|
|
//任务日志细表异常操作
|
|
|
detail.setJobStatus("0");
|
|
|
detail.setJobContent(msg);
|
|
|
logger.info(msg); //文本日志
|
|
|
} else {
|
|
|
detail.setJobStatus("1");
|
|
|
detail.setJobContent("采集成功!");
|
|
|
successCount += rows;
|
|
|
}
|
|
|
} catch (Exception ex) {
|
|
|
msg = ex.getMessage();
|
|
|
}
|
|
|
detail.setEndTime(new Date());
|
|
|
datacollectLogDao.saveEntity(detail);
|
|
|
}
|
|
|
|
|
|
message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集字段映射为空!");
|
|
|
}
|
|
|
} else {
|
|
|
throw new Exception(jobDatasetName + "数据集映射为空!");
|
|
|
}
|
|
|
} else {
|
|
|
throw new Exception("非法webservice路径!");
|
|
|
}
|
|
|
|
|
|
logger.info(message);
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 采集入库(包含blob字段处理)
|
|
|
* @return
|
|
|
*/
|
|
|
private String intoMongodb2(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
|
|
|
{
|
|
|
String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
|
|
|
String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
|
|
|
PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
|
|
|
if (patientIdentity != null) {
|
|
|
patientIdCode = patientIdentity.getPatientIDCode();
|
|
|
eventNoCode = patientIdentity.getEventNoCode();
|
|
|
}
|
|
|
try{
|
|
|
if(!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
|
|
|
return "Mongodb索引创建失败!(表:"+stdDatasetCode+")";
|
|
|
}
|
|
|
if(list!=null && list.size()>0)
|
|
|
{
|
|
|
|
|
|
//TODO TOSET 判断是否是非结构化数据集
|
|
|
if ("unstructured".equals(stdDatasetCode)){
|
|
|
for (JSONObject jsonObject:list) {
|
|
|
//文件内容保存到GridFS,细表内容字段保存为文件objctId
|
|
|
Blob blob = (Blob) jsonObject.get("CONTENT");
|
|
|
String type = (String) jsonObject.get("FILE_TYPE");
|
|
|
String patientId= (String) jsonObject.get("patient_id");
|
|
|
String eventNo= (String) jsonObject.get("event_no");
|
|
|
Map<String,Object> params = new HashMap<>();
|
|
|
params.put("patient_id",patientId);
|
|
|
params.put("event_no",eventNo);
|
|
|
try {
|
|
|
ObjectId objectId = GridFSUtil.uploadFile("files", blob, type, params);
|
|
|
jsonObject.put("CONTENT", objectId);
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
//字典未转换前采集到原始库
|
|
|
boolean b = mongoOrigin.insert(stdDatasetCode,translateDictCN(list, colList,schemeVersion));
|
|
|
|
|
|
//字典转换
|
|
|
list = translateDict(list, colList,schemeVersion);
|
|
|
|
|
|
//采集到mongodb
|
|
|
b = mongo.insert(stdDatasetCode,list);
|
|
|
if(!b)
|
|
|
{
|
|
|
if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
|
|
|
{
|
|
|
System.out.print(mongo.errorMessage);
|
|
|
return mongo.errorMessage;
|
|
|
}
|
|
|
else {
|
|
|
return "Mongodb保存失败!(表:"+stdDatasetCode+")";
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
catch (Exception e)
|
|
|
{
|
|
|
return e.getMessage();
|
|
|
}
|
|
|
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 数据库采集(包含Blob类型数据)
|
|
|
* @param ds
|
|
|
* @param schemeVersion
|
|
|
* @param logId
|
|
|
* @return
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
private String collectBlobTable(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
|
|
|
{
|
|
|
String message = "";
|
|
|
String datasetId = ds.getJobDatasetId();
|
|
|
String jobDatasetName = ds.getJobDatasetName();
|
|
|
String condition=ds.getJobDatasetCondition();
|
|
|
String key=ds.getJobDatasetKey();
|
|
|
String keytype=ds.getJobDatasetKeytype();
|
|
|
String keyvalue=ds.getJobDatasetKeyvalue();
|
|
|
String orgCode = ds.getOrgCode();
|
|
|
|
|
|
String datasourceId = ds.getDatasourceId();
|
|
|
String config = ds.getConfig(); //数据库连接
|
|
|
DBHelper db = new DBHelper(datasourceId,config);
|
|
|
DBType dbType = db.dbType;
|
|
|
|
|
|
//获取数据集映射
|
|
|
List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
|
|
|
JSONArray datasetList = new JSONArray(datasetString);
|
|
|
|
|
|
if(datasetList!=null &&datasetList.length()>0)
|
|
|
{
|
|
|
String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
|
|
|
String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
|
|
|
//获取数据集字段映射结构
|
|
|
List colString = stdService.getDatacolByScheme(schemeVersion,datasetId);
|
|
|
JSONArray colList = new JSONArray(colString);
|
|
|
|
|
|
if(colList!=null && colList.length()>0)
|
|
|
{
|
|
|
//拼接查询sql
|
|
|
String strSql = "Select '" + orgCode +"' as RSCOM_ORG_CODE";
|
|
|
for(int i=0; i< colList.length();i++)
|
|
|
{
|
|
|
JSONObject col = colList.getJSONObject(i);
|
|
|
String adapterMetadataCode = col.optString("adapterMetadataCode");
|
|
|
if(adapterMetadataCode.length()>0)
|
|
|
{
|
|
|
strSql+= ","+adapterMetadataCode +" as " + col.optString("stdMetadataCode") ;
|
|
|
}
|
|
|
}
|
|
|
strSql += " from " +adapterTableName;
|
|
|
String strWhere = " where 1=1";
|
|
|
//采集范围
|
|
|
if(condition!=null && condition.length()>0)
|
|
|
{
|
|
|
strWhere += getConditionSql(dbType,condition);
|
|
|
}
|
|
|
//增量采集
|
|
|
String maxKey = "0";
|
|
|
if(key!=null && key.length()>0)
|
|
|
{
|
|
|
maxKey = key;
|
|
|
|
|
|
if(keytype.toUpperCase().equals("DATE")) //时间类型
|
|
|
{
|
|
|
if(keyvalue!=null && keyvalue.length()>0) {
|
|
|
Date keyDate = new Date();
|
|
|
//字符串转时间
|
|
|
keyDate = DateConvert.toDate(keyvalue);
|
|
|
//根据数据库类型获取时间sql
|
|
|
strWhere += " and "+ maxKey + ">'"+getDateSqlByDBType(dbType,keyDate)+"'";
|
|
|
}
|
|
|
|
|
|
}
|
|
|
else if(keytype.toUpperCase().equals("VARCHAR")) //字符串类型
|
|
|
{
|
|
|
maxKey = getToNumberSqlByDBType(dbType,key);
|
|
|
if(keyvalue!=null && keyvalue.length()>0) {
|
|
|
strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
|
|
|
}
|
|
|
}
|
|
|
else{
|
|
|
if(keyvalue!=null && keyvalue.length()>0) {
|
|
|
strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
|
|
|
}
|
|
|
}
|
|
|
strWhere += " order by " + maxKey;
|
|
|
}
|
|
|
|
|
|
strSql += strWhere;
|
|
|
//总条数
|
|
|
String sqlCount = "select count(1) as COUNT from (" + strSql+")";
|
|
|
String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
|
|
|
JSONObject objCount = db.load(sqlCount);
|
|
|
if(objCount==null)
|
|
|
{
|
|
|
if(db.errorMessage.length()>0)
|
|
|
{
|
|
|
throw new Exception(db.errorMessage);
|
|
|
}
|
|
|
else{
|
|
|
throw new Exception("查询异常:"+sqlCount);
|
|
|
}
|
|
|
}
|
|
|
else{
|
|
|
int count = objCount.getInt("COUNT");
|
|
|
|
|
|
if(count==0) //0条记录,无需采集
|
|
|
{
|
|
|
message = "0条记录,无需采集。";
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
//获取最大值
|
|
|
JSONObject objMax = db.load(sqlMax);
|
|
|
|
|
|
int successCount = 0;
|
|
|
String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
|
|
|
//修改最大值
|
|
|
if(maxKeyvalue!=null&& maxKeyvalue.length()>0)
|
|
|
{
|
|
|
datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(),maxKeyvalue);
|
|
|
logger.info("修改任务数据集最大值为"+maxKeyvalue+"。"); //文本日志
|
|
|
}
|
|
|
int countPage = 1;
|
|
|
if(count > maxNum) //分页采集
|
|
|
{
|
|
|
countPage = count/maxNum+1;
|
|
|
}
|
|
|
for(int i=0;i<countPage;i++)
|
|
|
{
|
|
|
int rows = maxNum;
|
|
|
if(i+1==countPage){
|
|
|
rows = count-i*maxNum;
|
|
|
}
|
|
|
String sql = getPageSqlByDBType(dbType,strSql,i*maxNum,rows); //获取分页sql语句
|
|
|
|
|
|
RsJobLogDetail detail = new RsJobLogDetail();
|
|
|
detail.setStartTime(new Date());
|
|
|
detail.setJobLogId(logId);
|
|
|
detail.setDatasourceId(datasourceId);
|
|
|
detail.setConfig(config);
|
|
|
detail.setStdDatasetCode(stdTableName);
|
|
|
detail.setJobDatasetId(datasetId);
|
|
|
detail.setJobDatasetName(ds.getJobDatasetName());
|
|
|
detail.setJobId(ds.getJobId());
|
|
|
detail.setJobSql(sql);
|
|
|
detail.setJobNum(i+1);
|
|
|
detail.setJobDatasetRows(rows);
|
|
|
detail.setSchemeVersion(schemeVersion);
|
|
|
|
|
|
List<JSONObject> list = db.query(sql);
|
|
|
String msg = "";
|
|
|
if(list!=null)
|
|
|
{
|
|
|
msg = intoMongodb2(list,schemeVersion,stdTableName,colList); //返回信息
|
|
|
}
|
|
|
else{
|
|
|
if(db.errorMessage.length()>0)
|
|
|
{
|
|
|
msg = db.errorMessage;
|
|
|
}
|
|
|
else{
|
|
|
msg = "查询数据为空!";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if(msg.length()>0)
|
|
|
{
|
|
|
//任务日志细表异常操作
|
|
|
detail.setJobStatus("0");
|
|
|
detail.setJobContent(msg);
|
|
|
}
|
|
|
else{
|
|
|
detail.setJobStatus("1");
|
|
|
detail.setJobContent("采集成功!");
|
|
|
successCount += rows;
|
|
|
}
|
|
|
detail.setEndTime(new Date());
|
|
|
datacollectLogDao.saveEntity(detail);
|
|
|
}
|
|
|
|
|
|
message = jobDatasetName + "采集成功"+successCount+"条数据,总条数"+count+"条。";
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
throw new Exception(jobDatasetName + "数据集字段映射为空!");
|
|
|
}
|
|
|
}
|
|
|
else{
|
|
|
throw new Exception(jobDatasetName + "数据集映射为空!");
|
|
|
}
|
|
|
|
|
|
|
|
|
return message;
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|