浏览代码

collectHelper删除

demon 8 年之前
父节点
当前提交
dc051b4721

+ 4 - 1
hos-broker/src/main/java/com/yihu/hos/broker/services/camel/ESBCamelService.java

@ -49,13 +49,15 @@ public class ESBCamelService {
    private static Logger logger = LogManager.getLogger(ESBCamelService.class);
    @Autowired
    private MongoConfiguration mongoConfig;
    private String dbName = "upload";
    private String serviceFlow = "serviceFlow";
    private String configuration = "configuration";
    @Autowired
    private ObjectMapper objectMapper;
    @Value("${hos.esb.rest-url}")
    private String centerUrl;
    @Value("${spring.data.mongodb.gridFsDatabase}")
    private String dbName;
    public Result onServiceFlowAdd(String msg) {
        try {
@ -275,6 +277,7 @@ public class ESBCamelService {
            String packagePath = StringUtil.replaceStrAll(handleFile.getPackageName(), ".", "/");
            String classPath =  resource.getPath()+ packagePath + "/" + handleFile.getClassName() + ClassFileUtil.CLASS_FILE;
            GridFSUtil.uploadFile(classPath, handleFile.getClassName()+handleFile.getRouteCode() + ClassFileUtil.CLASS_FILE,null);
            //TODO 上传到本地mongodb和中心mongodb
        }
        return succ;
    }

+ 1 - 1
hos-broker/src/main/resources/application.yml

@ -19,7 +19,7 @@ spring:
  profiles: dev
  datasource:
    driverClassName: com.mysql.jdbc.Driver
    url: jdbc:mysql://192.168.1.220:8066/hos1?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true
    url: jdbc:mysql://192.168.1.220:8066/hos2?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true
    username: hos
    password: 123456
    test-on-borrow: true

+ 9 - 0
hos-web-framework/src/main/java/com/yihu/hos/web/framework/model/bo/ServiceFlow.java

@ -18,6 +18,15 @@ public class ServiceFlow {
    private ArrayList<HandleFile> handleFiles;
    private Date updated;
    private String flowType;    //pull or push?
    private String tenant;
    public String getTenant() {
        return tenant;
    }
    public void setTenant(String tenant) {
        this.tenant = tenant;
    }
    public String getId() {
        return id;

+ 4 - 2
hos-web-framework/src/main/java/com/yihu/hos/web/framework/util/GridFSUtil.java

@ -291,7 +291,9 @@ public class GridFSUtil {
            if (gridFSFile != null) {
                return saveFileName;
            }
        } finally {
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
@ -299,8 +301,8 @@ public class GridFSUtil {
                    e.printStackTrace();
                }
            }
            return null;
        }
        return null;
    }
    /**

文件差异内容过多而无法显示
+ 0 - 205
src/main/java/com/yihu/hos/common/CollectHelper.java


+ 0 - 1207
src/main/java/com/yihu/hos/datacollect/service/DatacollectService.java

@ -1,1207 +0,0 @@
package com.yihu.hos.datacollect.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yihu.ehr.dbhelper.common.QueryCondition;
import com.yihu.ehr.dbhelper.common.enums.DBType;
import com.yihu.ehr.dbhelper.common.sqlparser.ParserMysql;
import com.yihu.ehr.dbhelper.common.sqlparser.ParserOracle;
import com.yihu.ehr.dbhelper.common.sqlparser.ParserSql;
import com.yihu.ehr.dbhelper.common.sqlparser.ParserSqlserver;
import com.yihu.ehr.dbhelper.jdbc.DBHelper;
import com.yihu.ehr.dbhelper.mongodb.MongodbHelper;
import com.yihu.hos.common.Services;
import com.yihu.hos.core.log.Logger;
import com.yihu.hos.core.log.LoggerFactory;
import com.yihu.hos.crawler.model.config.SysConfig;
import com.yihu.hos.crawler.model.patient.PatientIdentity;
import com.yihu.hos.datacollect.dao.DatacollectDao;
import com.yihu.hos.datacollect.dao.DatacollectLogDao;
import com.yihu.hos.datacollect.model.*;
import com.yihu.hos.resource.service.StdService;
import com.yihu.hos.web.framework.constant.DateConvert;
import com.yihu.hos.web.framework.constant.SqlConstants;
import com.yihu.hos.web.framework.model.ActionResult;
import com.yihu.hos.web.framework.util.GridFSUtil;
import org.bson.types.ObjectId;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.ByteArrayInputStream;
import java.sql.Blob;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
 * 数据采集执行服务
 */
@Service(Services.DatacollectService)
public class DatacollectService  {
    static private final Logger logger = LoggerFactory.getLogger(DatacollectService.class);
    public static final String BEAN_ID = Services.DatacollectService;
    MongodbHelper mongoOrigin = new MongodbHelper("origin");
    MongodbHelper mongo = new MongodbHelper();
    String dateFormat = "yyyy-MM-dd HH:mm:ss"; //默认时间字符串格式
    int maxNum = 1000; //查询条数限制
    @Resource(name = Services.Datacollect)
    private DatacollectManager datacollect;
    @Resource(name = StdService.BEAN_ID)
    private StdService stdService;
    @Resource(name = DatacollectDao.BEAN_ID)
    private DatacollectDao datacollectDao;
    @Resource(name = DatacollectLogDao.BEAN_ID)
    private DatacollectLogDao datacollectLogDao;
    @Autowired
    private ObjectMapper objectMapper;
    /**
     * 根据连接字符串获取数据库类型
     */
    private static DBType getDbType(String uri) {
        return uri.startsWith("jdbc:mysql") ? DBType.Mysql : (uri.startsWith("jdbc:oracle") ? DBType.Oracle : (uri.startsWith("jdbc:hive2") ? DBType.Hive : (uri.startsWith("jdbc:microsoft:sqlserver") ? DBType.Sqlserver : DBType.Mysql)));
    }
//    public static void main(String[] args) throws Exception {
//        //namespace是命名空间,methodName是方法名
//        String sql = "select count(1) as COUNT,max(to_number(HDSD03_01_031)) as MAX_KEYVALUE from HDSC01_02 where 1=1 order by to_number(HDSD03_01_031)";
//        //调用web Service//输出调用结果
//        System.out.println(WebserviceUtil.request("http://172.19.103.71:8080/service/sql?wsdl", "ExcuteSQL", new Object[]{"", sql}));
//
//    }
    /**
     * 执行任务
     */
    public void executeJob(String jobId) throws Exception {
        //获取任务详细信息
        RsJobConfig job = datacollect.getJobById(jobId);
        RsJobLog log = new RsJobLog();
        log.setJobId(jobId);
        log.setJobStartTime(new Date());
        datacollectLogDao.saveEntity(log);
        String logId = log.getId();
        logger.info("任务" + jobId + "开始采集,新增日志" + logId + "。");
        StringBuilder logStr = new StringBuilder();
        int count = 0;
        int success = 0;
        try {
            String schemeVersion = job.getSchemeVersion();
            //获取任务相关数据集
            List<DtoJobDataset> list = datacollectDao.getDatacollectDataset(jobId);
            logger.info("获取任务相关数据集,数量" + list.size() + "。");
            if (list != null && list.size() > 0) {
                count = list.size();
                logStr.append("/*********** 开始采集 *******************/\n");
                //遍历数据集
                for (DtoJobDataset ds : list) {
                    try {
                        String type = ds.getType();
                        String message = "";
                        logStr.append(DateConvert.toString(new Date(), dateFormat) + " " + ds.getJobDatasetName());
                        if (type != null) {
                            if (type.equals("1")) //Web Service
                            {
                                message = collectWebservice(ds, schemeVersion, logId) + "\n";
                            } else if (type.equals("2"))//文件系统
                            {
                                message = "文件系统采集。\n";
                            } else { //数据库
                                message = collectTable(ds, schemeVersion, logId) + "\n";
                            }
                        } else {
                            message = ds.getJobDatasetName() + "未关联数据源!\n";
                        }
                        logger.info(message); //文本日志
                        logStr.append(message);
                        success++;
                    } catch (Exception ex) {
                        logger.info("异常:" + ex.getMessage());
                        logStr.append(ex.getMessage() + "\n");
                    }
                }
                logStr.append("/*********** 结束采集 *******************/\n");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            logger.info("异常:" + ex.getMessage());
            logStr.append(ex.getMessage() + "\n");
            logStr.append("/*********** 出现异常,中断采集 *******************/\n");
        }
        //任务主日志成功
        String jobContent = logStr.toString().replace("\"", "\\\"");
        if (jobContent.length() > 4000) {
            jobContent = jobContent.substring(0, 4000);
        }
        log.setJobContent(jobContent);
        log.setJobEndTime(new Date());
        log.setJobDatasetCount(count);
        log.setJobDatasetSuccess(success);
        logger.info("任务结束," + count + "个数据集成功采集" + success + "个。");
        datacollectLogDao.updateEntity(log);
    }
    /**
     * 根据日志详细补采数据
     */
    @Transactional
    public ActionResult repeatJob(String id) throws Exception {
        RsJobLogDetail log = datacollectLogDao.getEntity(RsJobLogDetail.class, id);
        if (log.getJobStatus().equals("2")) {
            return new ActionResult(false, "数据补采中!");
        }
        if (!log.getJobStatus().equals("0")) {
            return new ActionResult(false, "数据无需补采!");
        }
        try {
            log.setRepeatStartTime(new Date());
            log.setJobStatus("2"); //设置采集中状态
            datacollectLogDao.updateEntity(log);
        } catch (Exception e) {
            return new ActionResult(false, "补采失败!");
        }
        log.setJobStatus("0");
        datacollectLogDao.updateEntity(log);
        String stdDatasetCode = log.getStdDatasetCode();
        String sql = log.getJobSql();
        //数据库连接
        String datasourceId = log.getDatasourceId();
        String config = log.getConfig();
        DBHelper db = new DBHelper(datasourceId, config);
        //获取数据集字段映射结构
        String schemeVersion = log.getSchemeVersion();
        String datasetId = log.getJobDatasetId();
        List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
        JSONArray colList = new JSONArray(colString);
        List<JSONObject> list = db.query(sql);
        String message = intoMongodb(list, schemeVersion, stdDatasetCode, colList);
        if (message.length() > 0 || db.errorMessage.length() > 0) {
            log.setJobStatus("0");
            log.setRepeatEndTime(new Date());
            if (message.length() > 0) {
                log.setRepeatJobContent(message);
            } else {
                db.errorMessage.length();
            }
            datacollectLogDao.updateEntity(log);
            return new ActionResult(false, "补采失败!");
        } else {
            log.setJobStatus("3");
            log.setRepeatEndTime(new Date());
            log.setRepeatJobContent("补采成功!");
            datacollectLogDao.updateEntity(log);
            return new ActionResult(true, "补采成功!");
        }
    }
    /**
     * 根据数据库类型获取时间sql
     *
     * @return
     */
    private String getDateSqlByDBType(DBType dbType, Date date) throws Exception {
        String val = DateConvert.toString(date, dateFormat);
        if (dbType.equals(DBType.Mysql)) {
            return "date_format(\'" + val + "\',\'" + dateFormat + "\')";
        } else if (dbType.equals(DBType.Oracle)) {
            return "to_date(\'" + val + "\',\'" + dateFormat + "\')";
        } else {
            return val;
        }
    }
    /**
     * 根据数据库类型获取转换数值型sql
     */
    private String getToNumberSqlByDBType(DBType dbType, String key) throws Exception {
        if (dbType.equals(DBType.Mysql)) {
            return "cast(" + key + " as signed integer)";
        } else if (dbType.equals(DBType.Oracle)) {
            return "to_number(" + key + ")";
        } else {
            return key;
        }
    }
    /**
     * 根据数据库类型获取分页sql
     *
     * @return
     */
    private String getPageSqlByDBType(DBType dbType, String sql, int start, int rows) throws Exception {
        if (dbType.equals(DBType.Mysql)) {
            return sql + " LIMIT " + start + "," + rows;
        } else if (dbType.equals(DBType.Oracle)) {
            return " select * from (select t.*,ROWNUM RSCOM_RN from (" + sql + ") t where ROWNUM<" + (start + rows + 1) + ") where RSCOM_RN>= " + (start + 1);
        } else {
            return sql;
        }
    }
    /**
     * 字典全转换成中文
     */
    private List<JSONObject> translateDictCN(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
        //获取字典列表
        List<DtoDictCol> dictColList = new ArrayList<>();
        for (int i = 0; i < colList.length(); i++) {
            JSONObject col = colList.getJSONObject(i);
            String dictId = col.optString("adapterDictId");
            if (dictId != null && dictId.length() > 0) {
                String dictType = col.optString("adapterDataType");
                String stdMetadataCode = col.optString("stdMetadataCode");
                DtoDictCol dictCol = new DtoDictCol();
                dictCol.setStdMetadataCode(stdMetadataCode);
                dictCol.setStdDictId(dictId);
                dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
                //获取字典数据
                List dictString = stdService.getDictByScheme(schemeVersion, dictId);
                JSONArray dictAdapterArray = new JSONArray(dictString);
                dictCol.setDictList(dictAdapterArray);
                dictColList.add(dictCol);
            }
        }
        //翻译列表
        for (JSONObject data : list) {
            //遍历字典字段
            for (DtoDictCol col : dictColList) {
                String colNmae = col.getStdMetadataCode();
                String oldValue = data.optString(colNmae);
                String newValue = translateDictValueCN(oldValue, col.getAdapterDataType(), col.getDictList());
                if (newValue != null && newValue.length() > 0) {
                    data.put(colNmae, newValue);
                }
            }
        }
        return list;
    }
    /**
     * 转译字典成中文
     *
     * @return
     */
    private String translateDictValueCN(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
        if (type.equals("0")) //原本就是值
        {
            return oldValue;
        }
        //遍历字典数据(编码->名称)
        for (int i = 0; i < dictAdapterList.length(); i++) {
            JSONObject dictItem = dictAdapterList.getJSONObject(i);
            if (oldValue != null && dictItem.has("stdEntryCode")) {
                if (oldValue.equals(dictItem.getString("stdEntryCode"))) {
                    String newValue = dictItem.getString("stdEntryValue"); //名称
                    return newValue;
                }
            }
        }
        return oldValue;
    }
    /**
     * 字典转换
     *
     * @param list
     * @param colList
     * @return
     * @throws Exception
     */
    private List<JSONObject> translateDict(List<JSONObject> list, JSONArray colList, String schemeVersion) throws Exception {
        //获取字典列表
        List<DtoDictCol> dictColList = new ArrayList<>();
        for (int i = 0; i < colList.length(); i++) {
            JSONObject col = colList.getJSONObject(i);
            String dictId = col.optString("adapterDictId");
            if (dictId != null && dictId.length() > 0) {
                String dictType = col.optString("adapterDataType");
                String stdMetadataCode = col.optString("stdMetadataCode");
                DtoDictCol dictCol = new DtoDictCol();
                dictCol.setStdMetadataCode(stdMetadataCode);
                dictCol.setStdDictId(dictId);
                dictCol.setAdapterDataType(dictType.length() > 0 ? dictType : "1");//默认通过code转换字典
                //获取字典数据
                List dictString = stdService.getDictByScheme(schemeVersion, dictId);
                JSONArray dictAdapterArray = new JSONArray(dictString);
                dictCol.setDictList(dictAdapterArray);
                dictColList.add(dictCol);
            }
        }
        //翻译列表
        for (JSONObject data : list) {
            //遍历字典字段
            for (DtoDictCol col : dictColList) {
                String colNmae = col.getStdMetadataCode();
                String oldValue = data.optString(colNmae);
                String newValue = translateDictValue(oldValue, col.getAdapterDataType(), col.getDictList());
                if (newValue != null && newValue.length() > 0) {
                    data.put(colNmae, newValue);
                }
            }
        }
        return list;
    }
    /**
     * 转译字典
     *
     * @return
     */
    private String translateDictValue(String oldValue, String type, JSONArray dictAdapterList) throws Exception {
        //应用标准字段
        String colName = "adapterEntryCode";
        if (type.equals("0")) //通过name转译
        {
            colName = "adapterEntryValue";
        }
        //遍历字典数据
        for (int i = 0; i < dictAdapterList.length(); i++) {
            JSONObject dictItem = dictAdapterList.getJSONObject(i);
            if (oldValue != null && dictItem.has(colName)) {
                if (oldValue.equals(dictItem.getString(colName))) {
                    String newValue = dictItem.getString("stdEntryCode");
                    return newValue;
                }
            }
        }
        //找不到适配字典数据则返回空
        return "";
    }
    /**
     * 获取过滤条件
     *
     * @return
     */
    private String getCondition(DBType dbType, String conditionString) {
        JSONArray array = new JSONArray(conditionString);
        if (array != null && array.length() > 0) {
            List<QueryCondition> conditions = new ArrayList<>();
            for (Object item : array) {
                JSONObject obj = (JSONObject) item;
                String logical = obj.getString("andOr");
                String operation = obj.getString("condition");
                String field = obj.getString("field");
                String keyword = obj.getString("value");
                conditions.add(new QueryCondition(logical, operation, field, keyword));
            }
            //条件语句转换
            ParserSql ps;
            switch (dbType) {
                case Oracle:
                    ps = new ParserOracle();
                    break;
                case Sqlserver:
                    ps = new ParserSqlserver();
                    break;
                default:
                    ps = new ParserMysql();
            }
            return ps.getConditionSql(conditions);
        }
        return "";
    }
    /**
     * 获取条件SQL
     *
     * @param dbType
     * @param conditionString
     * @return
     * @throws ParseException
     */
    private String getConditionSql(DBType dbType, String conditionString) throws ParseException {
        String conditionSql = "";
        JSONArray conditions = new JSONArray(conditionString);
        Iterator iterator = conditions.iterator();
        while (iterator.hasNext()) {
            JSONObject condition = (JSONObject) iterator.next();
            String logic = condition.getString("condition");
            String andOr = condition.getString("andOr");
            String field = condition.getString("field");
            String value = condition.getString("value");
            String fieldType = condition.getString("type");
            String keys = "";
            if (andOr.equals(" AND ")) {
                conditionSql = conditionSql + " and ";
            } else {
                conditionSql = conditionSql + " or ";
            }
            if (logic.equals(" IN ") || logic.equals(" NOT IN ")) {
                String[] keywords = value.split(",");
                for (String key : keywords) {
                    keys += "'" + key + "',";
                }
                keys = " (" + keys.substring(0, keys.length() - 1) + ") ";
            } else if (logic.equals(" LIKE ")) {
                keys += " '%" + value + "%' ";
            } else {
                if (fieldType.equals("DATE")) {
                    keys += getDateFormatSql(dbType, value);
                } else {
                    keys += " '" + value + "' ";
                }
            }
            conditionSql += field + logic + keys;
        }
        return conditionSql;
    }
    /**
     * 获取对应数据库时间格式
     *
     * @param dbType
     * @param key
     * @return
     * @throws ParseException
     */
    private String getDateFormatSql(DBType dbType, String key) throws ParseException {
        String dateFormat = "yyyy-MM-dd HH:mm:ss";
        SimpleDateFormat formatDate = new SimpleDateFormat("yyyy-MM-dd");
        Date d = formatDate.parse(key);
        SimpleDateFormat format = new SimpleDateFormat(dateFormat);
        switch (dbType) {
            case Oracle:
                key = "to_date(\'" + format.format(d) + "\',\'YYYY-MM-DD HH24:MI:SS\')";
                break;
            case Sqlserver:
                break;
            default:
                key = "date_format(\'" + format.format(d) + "\',\'%y-%m-%d %T\')";
        }
        return key;
    }
    /**
     * 采集入库
     *
     * @return
     */
    private String intoMongodb(List<JSONObject> list, String schemeVersion, String stdDatasetCode, JSONArray colList) {
        String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
        String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
        PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
        if (patientIdentity != null) {
            patientIdCode = patientIdentity.getPatientIDCode();
            eventNoCode = patientIdentity.getEventNoCode();
        }
        try {
            if (!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
                return "Mongodb索引创建失败!(表:" + stdDatasetCode + ")";
            }
            if (list != null && list.size() > 0) {
                //字典未转换前采集到原始库
                boolean b = mongoOrigin.insert(stdDatasetCode, translateDictCN(list, colList, schemeVersion));
                //字典转换
                list = translateDict(list, colList, schemeVersion);
                //采集到mongodb
                b = mongo.insert(stdDatasetCode, list);
                if (!b) {
                    if (mongo.errorMessage != null && mongo.errorMessage.length() > 0) {
                        logger.debug(mongo.errorMessage);
                        return mongo.errorMessage;
                    } else {
                        return "Mongodb保存失败!(表:" + stdDatasetCode + ")";
                    }
                }
            }
        } catch (Exception e) {
            return e.getMessage();
        }
        return "";
    }
    /**
     * 数据库表采集
     *
     * @return
     */
    private String collectTable(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
        String message = "";
        String datasetId = ds.getJobDatasetId();
        String jobDatasetName = ds.getJobDatasetName();
        String condition = ds.getJobDatasetCondition();
        String key = ds.getJobDatasetKey();
        String keytype = ds.getJobDatasetKeytype();
        String keyvalue = ds.getJobDatasetKeyvalue();
        String orgCode = ds.getOrgCode();
        String datasourceId = ds.getDatasourceId();
        String config = ds.getConfig(); //数据库连接
        DBHelper db = new DBHelper(datasourceId, config);
        DBType dbType = db.dbType;
        //获取数据集映射
        List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
        JSONArray datasetList = new JSONArray(datasetString);
        if (datasetList != null && datasetList.length() > 0) {
            String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
            String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
            //获取数据集字段映射结构
            List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
            JSONArray colList = new JSONArray(colString);
            if (colList != null && colList.length() > 0) {
                //拼接查询sql
                String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
                for (int i = 0; i < colList.length(); i++) {
                    JSONObject col = colList.getJSONObject(i);
                    String adapterMetadataCode = col.optString("adapterMetadataCode");
                    if (adapterMetadataCode.length() > 0) {
                        strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
                    }
                }
                strSql += " from " + adapterTableName;
                String strWhere = " where 1=1";
                //采集范围
                if (condition != null && condition.length() > 0) {
                    strWhere += getConditionSql(dbType, condition);
                }
                //增量采集
                String maxKey = "0";
                if (key != null && key.length() > 0) {
                    maxKey = key;
                    if (keytype.toUpperCase().equals("DATE")) //时间类型
                    {
                        if (keyvalue != null && keyvalue.length() > 0) {
                            Date keyDate = new Date();
                            //字符串转时间
                            keyDate = DateConvert.toDate(keyvalue);
                            //根据数据库类型获取时间sql
                            strWhere += " and " + maxKey + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
                        }
                    } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
                    {
                        maxKey = getToNumberSqlByDBType(dbType, key);
                        if (keyvalue != null && keyvalue.length() > 0) {
                            strWhere += " and " + maxKey + ">'" + keyvalue + "'";
                        }
                    } else {
                        if (keyvalue != null && keyvalue.length() > 0) {
                            strWhere += " and " + maxKey + ">'" + keyvalue + "'";
                        }
                    }
                    strWhere += " order by " + maxKey;
                }
                strSql += strWhere;
                //总条数
                String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
                String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
                JSONObject objCount = db.load(sqlCount);
                if (objCount == null) {
                    if (db.errorMessage.length() > 0) {
                        throw new Exception(db.errorMessage);
                    } else {
                        throw new Exception("查询异常:" + sqlCount);
                    }
                } else {
                    int count = objCount.getInt("COUNT");
                    if (count == 0) //0条记录,无需采集
                    {
                        message = "0条记录,无需采集。";
                    } else {
                        //获取最大值
                        JSONObject objMax = db.load(sqlMax);
                        int successCount = 0;
                        String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
                        //修改最大值
                        if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
                            datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
                            logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
                        }
                        int countPage = 1;
                        if (count > maxNum) //分页采集
                        {
                            countPage = count / maxNum + 1;
                        }
                        for (int i = 0; i < countPage; i++) {
                            int rows = maxNum;
                            if (i + 1 == countPage) {
                                rows = count - i * maxNum;
                            }
                            String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
                            RsJobLogDetail detail = new RsJobLogDetail();
                            detail.setStartTime(new Date());
                            detail.setJobLogId(logId);
                            detail.setDatasourceId(datasourceId);
                            detail.setConfig(config);
                            detail.setStdDatasetCode(stdTableName);
                            detail.setJobDatasetId(datasetId);
                            detail.setJobDatasetName(ds.getJobDatasetName());
                            detail.setJobId(ds.getJobId());
                            detail.setJobSql(sql);
                            detail.setJobNum(i + 1);
                            detail.setJobDatasetRows(rows);
                            detail.setSchemeVersion(schemeVersion);
                            List<JSONObject> list = db.query(sql);
                            String msg = "";
                            if (list != null) {
                                msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
                            } else {
                                if (db.errorMessage.length() > 0) {
                                    msg = db.errorMessage;
                                } else {
                                    msg = "查询数据为空!";
                                }
                            }
                            if (msg.length() > 0) {
                                //任务日志细表异常操作
                                detail.setJobStatus("0");
                                detail.setJobContent(msg);
                                logger.info(msg); //文本日志
                            } else {
                                detail.setJobStatus("1");
                                detail.setJobContent("采集成功!");
                                successCount += rows;
                            }
                            detail.setEndTime(new Date());
                            datacollectLogDao.saveEntity(detail);
                        }
                        message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
                    }
                }
            } else {
                throw new Exception(jobDatasetName + "数据集字段映射为空!");
            }
        } else {
            throw new Exception(jobDatasetName + "数据集映射为空!");
        }
        logger.info(message);
        return message;
    }
    /**
     * XML转JSONList
     *
     * @return
     */
    private List<JSONObject> getListFromXml(String xml) throws Exception {
        SAXReader reader = new SAXReader();
        Document doc = reader.read(new ByteArrayInputStream(xml.getBytes("UTF-8")));
        Element root = doc.getRootElement();
        List<JSONObject> re = new ArrayList<>();
        //xml数据列表
        Iterator iter = root.elementIterator("Data");
        while (iter.hasNext()) {
            JSONObject obj = new JSONObject();
            Element el = (Element) iter.next();
            Iterator cols = el.elementIterator();
            while (cols.hasNext()) {
                Element col = (Element) cols.next();
                obj.put(col.getName().toUpperCase(), col.getStringValue());
            }
            re.add(obj);
        }
        return re;
    }
    /**
     * webservice采集
     *
     * @return
     */
    private String collectWebservice(DtoJobDataset ds, String schemeVersion, String logId) throws Exception {
        String message = "";
        String datasetId = ds.getJobDatasetId();
        String jobDatasetName = ds.getJobDatasetName();
        String condition = ds.getJobDatasetCondition();
        String key = ds.getJobDatasetKey();
        String keytype = ds.getJobDatasetKeytype();
        String keyvalue = ds.getJobDatasetKeyvalue();
        String orgCode = ds.getOrgCode();
        String datasourceId = ds.getDatasourceId();
        String config = ds.getConfig(); //数据库连接
        DBType dbType = DBType.Oracle;//********** 先定死Oracle ****************************
        //webservice地址
        Map<String, String> mapConfig = objectMapper.readValue(config, Map.class);
        if (mapConfig.containsKey("protocol") && mapConfig.containsKey("url")) {
            String url = mapConfig.get("protocol") + "://" + mapConfig.get("url");
            //获取数据集映射
            List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
            JSONArray datasetList = new JSONArray(datasetString);
            if (datasetList != null && datasetList.length() > 0) {
                String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
                String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
                //获取数据集字段映射结构
                List colString = stdService.getDatacolByScheme(schemeVersion, datasetId);
                JSONArray colList = new JSONArray(colString);
                if (colList != null && colList.length() > 0) {
                    //拼接查询sql
                    String strSql = "Select '" + orgCode + "' as RSCOM_ORG_CODE";
                    for (int i = 0; i < colList.length(); i++) {
                        JSONObject col = colList.getJSONObject(i);
                        String adapterMetadataCode = col.optString("adapterMetadataCode");
                        if (adapterMetadataCode.length() > 0) {
                            strSql += "," + adapterMetadataCode + " as " + col.optString("stdMetadataCode");
                        }
                    }
                    strSql += " from " + adapterTableName;
                    String strWhere = " where 1=1";
                    //采集范围
                    if (condition != null && condition.length() > 0) {
                        strWhere += getConditionSql(dbType, condition);
                    }
                    //增量采集
                    String maxKey = "0";
                    String keyValue = ds.getJobDatasetKeyvalue();
                    if (key != null && key.length() > 0) {
                        maxKey = key;
                        if (keytype.toUpperCase().equals("DATE")) //时间类型
                        {
                            Date keyDate = new Date();
                            if (keyvalue != null && keyvalue.length() > 0) {
                                //字符串转时间
                                keyDate = DateConvert.toDate(keyvalue);
                                //根据数据库类型获取时间sql
                                strWhere += " and " + key + ">'" + getDateSqlByDBType(dbType, keyDate) + "'";
                                strWhere += " order by " + key;
                            }
                        } else if (keytype.toUpperCase().equals("VARCHAR")) //字符串类型
                        {
                            maxKey = getToNumberSqlByDBType(dbType, key);
                            if (keyvalue != null && keyvalue.length() > 0) {
                                strWhere += " and " + maxKey + ">'" + keyvalue + "'";
                                strWhere += " order by " + maxKey;
                            }
                        } else {
                            if (keyvalue != null && keyvalue.length() > 0) {
                                strWhere += " and " + key + ">'" + keyvalue + "'";
                                strWhere += " order by " + key;
                            }
                        }
                    }
                    strSql += strWhere;
                    //总条数和最大值查询
                    String sqlCount = "select count(1) as COUNT from (" + strSql + ")";
                    String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
                    //webservice获取数据总条数
                    String strCount = "";//WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlCount});
                    List<JSONObject> dataCount = getListFromXml(strCount);
                    if (dataCount != null && dataCount.size() > 0) {
                        Integer count = Integer.parseInt(dataCount.get(0).getString("COUNT"));
                        if (count == 0) //0条记录,无需采集
                        {
                            message = "0条记录,无需采集。";
                        } else {
                            //webservice获取最大值
                            String strMax = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sqlMax});
                            List<JSONObject> dataMax = getListFromXml(strCount);
                            int successCount = 0;
                            String maxKeyvalue = dataMax.get(0).getString("MAX_KEYVALUE");
                            //修改最大值
                            if (maxKeyvalue != null && maxKeyvalue.length() > 0) {
                                datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(), maxKeyvalue);
                                logger.info("修改任务数据集最大值为" + maxKeyvalue + "。"); //文本日志
                            }
                            int countPage = 1;
                            if (count > maxNum) //分页采集
                            {
                                countPage = count / maxNum + 1;
                            }
                            for (int i = 0; i < countPage; i++) {
                                int rows = maxNum;
                                if (i + 1 == countPage) {
                                    rows = count - i * maxNum;
                                }
                                String sql = getPageSqlByDBType(dbType, strSql, i * maxNum, rows); //获取分页sql语句
                                RsJobLogDetail detail = new RsJobLogDetail();
                                detail.setStartTime(new Date());
                                detail.setJobLogId(logId);
                                detail.setDatasourceId(datasourceId);
                                detail.setConfig(config);
                                detail.setStdDatasetCode(stdTableName);
                                detail.setJobDatasetId(datasetId);
                                detail.setJobDatasetName(ds.getJobDatasetName());
                                detail.setJobId(ds.getJobId());
                                detail.setJobSql(sql);
                                detail.setJobNum(i + 1);
                                detail.setJobDatasetRows(rows);
                                detail.setSchemeVersion(schemeVersion);
                                String msg = "";
                                try {
                                    //获取分页数据
                                    String strList = ""; //WebserviceUtil.request(url, "ExcuteSQL", new Object[]{"", sql});
                                    List<JSONObject> list = getListFromXml(strList);
                                    if (list != null) {
                                        msg = intoMongodb(list, schemeVersion, stdTableName, colList); //返回信息
                                    } else {
                                        msg = "查询数据为空!";
                                    }
                                    if (msg.length() > 0) {
                                        //任务日志细表异常操作
                                        detail.setJobStatus("0");
                                        detail.setJobContent(msg);
                                        logger.info(msg); //文本日志
                                    } else {
                                        detail.setJobStatus("1");
                                        detail.setJobContent("采集成功!");
                                        successCount += rows;
                                    }
                                } catch (Exception ex) {
                                    msg = ex.getMessage();
                                }
                                detail.setEndTime(new Date());
                                datacollectLogDao.saveEntity(detail);
                            }
                            message = jobDatasetName + "采集成功" + successCount + "条数据,总条数" + count + "条。";
                        }
                    }
                } else {
                    throw new Exception(jobDatasetName + "数据集字段映射为空!");
                }
            } else {
                throw new Exception(jobDatasetName + "数据集映射为空!");
            }
        } else {
            throw new Exception("非法webservice路径!");
        }
        logger.info(message);
        return message;
    }
    /**
     * 采集入库(包含blob字段处理)
     * @return
     */
    private String intoMongodb2(List<JSONObject> list,String schemeVersion,String stdDatasetCode,JSONArray colList)
    {
        String patientIdCode = SqlConstants.PATIENT_ID.toUpperCase();
        String eventNoCode = SqlConstants.EVENT_NO.toUpperCase();
        PatientIdentity patientIdentity = SysConfig.getInstance().getPatientIdentity(stdDatasetCode);
        if (patientIdentity != null) {
            patientIdCode = patientIdentity.getPatientIDCode();
            eventNoCode = patientIdentity.getEventNoCode();
        }
        try{
            if(!mongo.createIndex(stdDatasetCode, "patientIndex", patientIdCode, eventNoCode)) {
                return "Mongodb索引创建失败!(表:"+stdDatasetCode+")";
            }
            if(list!=null && list.size()>0)
            {
                //TODO TOSET 判断是否是非结构化数据集
                if ("unstructured".equals(stdDatasetCode)){
                    for (JSONObject jsonObject:list) {
                        //文件内容保存到GridFS,细表内容字段保存为文件objctId
                        Blob blob = (Blob) jsonObject.get("CONTENT");
                        String type = (String) jsonObject.get("FILE_TYPE");
                        String patientId=  (String) jsonObject.get("patient_id");
                        String eventNo=  (String) jsonObject.get("event_no");
                        Map<String,Object> params = new HashMap<>();
                        params.put("patient_id",patientId);
                        params.put("event_no",eventNo);
                        try {
                            ObjectId objectId = GridFSUtil.uploadFile( blob, type, params);
                            jsonObject.put("CONTENT", objectId);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                //字典未转换前采集到原始库
                boolean b = mongoOrigin.insert(stdDatasetCode,translateDictCN(list, colList,schemeVersion));
                //字典转换
                list = translateDict(list, colList,schemeVersion);
                //采集到mongodb
                b = mongo.insert(stdDatasetCode,list);
                if(!b)
                {
                    if(mongo.errorMessage!=null && mongo.errorMessage.length()>0)
                    {
                        System.out.print(mongo.errorMessage);
                        return mongo.errorMessage;
                    }
                    else {
                        return "Mongodb保存失败!(表:"+stdDatasetCode+")";
                    }
                }
            }
        }
        catch (Exception e)
        {
            return e.getMessage();
        }
        return "";
    }
    /**
     * 数据库采集(包含Blob类型数据)
     * @param ds
     * @param schemeVersion
     * @param logId
     * @return
     * @throws Exception
     */
    private String collectBlobTable(DtoJobDataset ds,String schemeVersion,String logId) throws Exception
    {
        String message = "";
        String datasetId = ds.getJobDatasetId();
        String jobDatasetName = ds.getJobDatasetName();
        String condition=ds.getJobDatasetCondition();
        String key=ds.getJobDatasetKey();
        String keytype=ds.getJobDatasetKeytype();
        String keyvalue=ds.getJobDatasetKeyvalue();
        String orgCode = ds.getOrgCode();
        String datasourceId = ds.getDatasourceId();
        String config = ds.getConfig(); //数据库连接
        DBHelper db = new DBHelper(datasourceId,config);
        DBType dbType = db.dbType;
        //获取数据集映射
        List datasetString = stdService.getDatasetByScheme(schemeVersion, datasetId);
        JSONArray datasetList = new JSONArray(datasetString);
        if(datasetList!=null &&datasetList.length()>0)
        {
            String stdTableName = datasetList.getJSONObject(0).optString("stdDatasetCode");
            String adapterTableName = datasetList.getJSONObject(0).optString("adapterDatasetCode");
            //获取数据集字段映射结构
            List colString = stdService.getDatacolByScheme(schemeVersion,datasetId);
            JSONArray colList = new JSONArray(colString);
            if(colList!=null && colList.length()>0)
            {
                //拼接查询sql
                String strSql = "Select '" + orgCode +"' as RSCOM_ORG_CODE";
                for(int i=0; i< colList.length();i++)
                {
                    JSONObject col = colList.getJSONObject(i);
                    String adapterMetadataCode = col.optString("adapterMetadataCode");
                    if(adapterMetadataCode.length()>0)
                    {
                        strSql+= ","+adapterMetadataCode +" as " + col.optString("stdMetadataCode") ;
                    }
                }
                strSql += " from " +adapterTableName;
                String strWhere = " where 1=1";
                //采集范围
                if(condition!=null && condition.length()>0)
                {
                    strWhere += getConditionSql(dbType,condition);
                }
                //增量采集
                String maxKey = "0";
                if(key!=null && key.length()>0)
                {
                    maxKey = key;
                    if(keytype.toUpperCase().equals("DATE")) //时间类型
                    {
                        if(keyvalue!=null && keyvalue.length()>0) {
                            Date keyDate = new Date();
                            //字符串转时间
                            keyDate = DateConvert.toDate(keyvalue);
                            //根据数据库类型获取时间sql
                            strWhere += " and "+ maxKey + ">'"+getDateSqlByDBType(dbType,keyDate)+"'";
                        }
                    }
                    else if(keytype.toUpperCase().equals("VARCHAR")) //字符串类型
                    {
                        maxKey = getToNumberSqlByDBType(dbType,key);
                        if(keyvalue!=null && keyvalue.length()>0) {
                            strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
                        }
                    }
                    else{
                        if(keyvalue!=null && keyvalue.length()>0) {
                            strWhere += " and "+ maxKey + ">'" + keyvalue + "'";
                        }
                    }
                    strWhere += " order by " + maxKey;
                }
                strSql += strWhere;
                //总条数
                String sqlCount = "select count(1) as COUNT from (" + strSql+")";
                String sqlMax = "select max(" + maxKey + ") as MAX_KEYVALUE from " + adapterTableName + strWhere;
                JSONObject objCount = db.load(sqlCount);
                if(objCount==null)
                {
                    if(db.errorMessage.length()>0)
                    {
                        throw new Exception(db.errorMessage);
                    }
                    else{
                        throw new Exception("查询异常:"+sqlCount);
                    }
                }
                else{
                    int count = objCount.getInt("COUNT");
                    if(count==0) //0条记录,无需采集
                    {
                        message = "0条记录,无需采集。";
                    }
                    else
                    {
                        //获取最大值
                        JSONObject objMax = db.load(sqlMax);
                        int successCount = 0;
                        String maxKeyvalue = objMax.optString("MAX_KEYVALUE");
                        //修改最大值
                        if(maxKeyvalue!=null&& maxKeyvalue.length()>0)
                        {
                            datacollectLogDao.updateJobDatasetKeyvalue(ds.getId(),maxKeyvalue);
                            logger.info("修改任务数据集最大值为"+maxKeyvalue+"。"); //文本日志
                        }
                        int countPage = 1;
                        if(count > maxNum) //分页采集
                        {
                            countPage = count/maxNum+1;
                        }
                        for(int i=0;i<countPage;i++)
                        {
                            int rows = maxNum;
                            if(i+1==countPage){
                                rows = count-i*maxNum;
                            }
                            String sql = getPageSqlByDBType(dbType,strSql,i*maxNum,rows); //获取分页sql语句
                            RsJobLogDetail detail = new RsJobLogDetail();
                            detail.setStartTime(new Date());
                            detail.setJobLogId(logId);
                            detail.setDatasourceId(datasourceId);
                            detail.setConfig(config);
                            detail.setStdDatasetCode(stdTableName);
                            detail.setJobDatasetId(datasetId);
                            detail.setJobDatasetName(ds.getJobDatasetName());
                            detail.setJobId(ds.getJobId());
                            detail.setJobSql(sql);
                            detail.setJobNum(i+1);
                            detail.setJobDatasetRows(rows);
                            detail.setSchemeVersion(schemeVersion);
                            List<JSONObject> list = db.query(sql);
                            String msg = "";
                            if(list!=null)
                            {
                                msg = intoMongodb2(list,schemeVersion,stdTableName,colList); //返回信息
                            }
                            else{
                                if(db.errorMessage.length()>0)
                                {
                                    msg = db.errorMessage;
                                }
                                else{
                                    msg = "查询数据为空!";
                                }
                            }
                            if(msg.length()>0)
                            {
                                //任务日志细表异常操作
                                detail.setJobStatus("0");
                                detail.setJobContent(msg);
                            }
                            else{
                                detail.setJobStatus("1");
                                detail.setJobContent("采集成功!");
                                successCount += rows;
                            }
                            detail.setEndTime(new Date());
                            datacollectLogDao.saveEntity(detail);
                        }
                        message = jobDatasetName + "采集成功"+successCount+"条数据,总条数"+count+"条。";
                    }
                }
            }
            else
            {
                throw new Exception(jobDatasetName + "数据集字段映射为空!");
            }
        }
        else{
            throw new Exception(jobDatasetName + "数据集映射为空!");
        }
        return message;
    }
    
}

+ 8 - 9
src/main/webapp/WEB-INF/ehr/jsp/common/indexJs.jsp

@ -62,10 +62,13 @@
            $(".l-layout-left").css({background: "#dce6f0"})
            var tenantManager = [
                {id: 200, text: '租户管理', icon: '${staticRoot}/images/index/menu5_icon.png'},
                {id: 201, pid: 200, text: '租户管理', url: '${contextRoot}/tenant/initial'},
            ]
            var tenantManager ;
            var userRole = localStorage.getItem("userRole");
            if(userRole=="admin"){
                //是管理中心用户,则添加租户管理模块
//                menu = menu.concat(tenantManager);
                tenantManager =  {id: 75, pid: 7, text: '租户管理', url: '${contextRoot}/tenant/initial'};
            }
            //菜单列表
            var menu = [
                //标准管理
@ -101,6 +104,7 @@
                {id: 37, pid: 3, text: '维度类别配置', url: '${contextRoot}/dimension/dimensioncatetory'},
                //用户安全中心
                {id: 7, text: '用户安全', icon: '${staticRoot}/images/index/menu5_icon.png'},
                tenantManager,
                {id: 71, pid: 7, text: '机构管理', url: '${contextRoot}/org/initial'},
                {id: 72, pid: 7, text: '用户管理', url: '${contextRoot}/user/initial'},
                {id: 73, pid: 7, text: '角色管理', url: '${contextRoot}/role/initial'},
@ -113,11 +117,6 @@
                {id: 94, pid: 9, text: '菜单按钮配置', url: '${contextRoot}/menu/menuAction/initial'},
                {id: 95, pid: 9, text: '数据源配置', url: '${contextRoot}/datasource/configSources'},
            ];
            var userRole = localStorage.getItem("userRole");
            if(userRole=="admin"){
                //是管理中心用户,则添加租户管理模块
                menu = menu.concat(tenantManager);
            }
            me.menuTree = $('#ulTree').ligerTree({
                data: menu,
                idFieldName: 'id',