|  | @ -12,15 +12,21 @@ import com.yihu.wlyy.figure.label.model.DataModel;
 | 
	
		
			
				|  |  | import com.yihu.wlyy.figure.label.model.SaveModel;
 | 
	
		
			
				|  |  | import com.yihu.wlyy.figure.label.service.JobService;
 | 
	
		
			
				|  |  | import com.yihu.wlyy.figure.label.storage.Store2ES;
 | 
	
		
			
				|  |  | import com.yihu.wlyy.figure.label.util.ConstantUtil;
 | 
	
		
			
				|  |  | import org.apache.commons.lang3.time.DateFormatUtils;
 | 
	
		
			
				|  |  | import org.quartz.*;
 | 
	
		
			
				|  |  | import org.slf4j.Logger;
 | 
	
		
			
				|  |  | import org.slf4j.LoggerFactory;
 | 
	
		
			
				|  |  | import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  | import org.springframework.context.annotation.Scope;
 | 
	
		
			
				|  |  | import org.springframework.stereotype.Component;
 | 
	
		
			
				|  |  | import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  | import org.springframework.transaction.annotation.Transactional;
 | 
	
		
			
				|  |  | import org.springframework.util.CollectionUtils;
 | 
	
		
			
				|  |  | import org.springframework.util.StringUtils;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | import java.util.ArrayList;
 | 
	
		
			
				|  |  | import java.util.Date;
 | 
	
		
			
				|  |  | import java.util.List;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | /**
 | 
	
	
		
			
				|  | @ -68,6 +74,8 @@ public class Mysql2ESJob implements Job {
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     private String source;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     private String datasource;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 数据表的id,有些数据是按时间增量查询,有些数据是按表的主键id增量查询
 | 
	
		
			
				|  |  |      */
 | 
	
	
		
			
				|  | @ -82,30 +90,32 @@ public class Mysql2ESJob implements Job {
 | 
	
		
			
				|  |  |         //根据obconfig里配置的sql提取数据
 | 
	
		
			
				|  |  |         extract();
 | 
	
		
			
				|  |  |         //数据转换
 | 
	
		
			
				|  |  |         List<SaveModel> list = new ArrayList<>();
 | 
	
		
			
				|  |  |         list = convert();
 | 
	
		
			
				|  |  |         //数据保存
 | 
	
		
			
				|  |  |         boolean bool = save(list);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //增量存储成功后,修改增量的czrq时间为当前时间或主键id为上次获取到的数据的最后的一条的id
 | 
	
		
			
				|  |  |         if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.TIME.toString())){
 | 
	
		
			
				|  |  |             jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,null,bool);
 | 
	
		
			
				|  |  |         }else if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.NUM.toString())){
 | 
	
		
			
				|  |  |             int index = dataModelList.size();
 | 
	
		
			
				|  |  |             lastDataId = dataModelList.get(index -1).getId();
 | 
	
		
			
				|  |  |             jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,this.lastDataId,bool);
 | 
	
		
			
				|  |  |         List<SaveModel> list = convert();
 | 
	
		
			
				|  |  |         //转换后的结果
 | 
	
		
			
				|  |  |         if (!CollectionUtils.isEmpty(list)) {
 | 
	
		
			
				|  |  |             //数据保存
 | 
	
		
			
				|  |  |             boolean bool = save(list);
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             //增量存储成功后,修改增量的czrq时间为当前时间或主键id为上次获取到的数据的最后的一条的id
 | 
	
		
			
				|  |  |             if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.TIME.toString())){
 | 
	
		
			
				|  |  |                 jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,null,bool);
 | 
	
		
			
				|  |  |             }else if(StringUtils.endsWithIgnoreCase(this.flJobConfig.getSqlFieldType().toString(), JobSqlFieldTypeEnum.NUM.toString())){
 | 
	
		
			
				|  |  |                 int index = dataModelList.size();
 | 
	
		
			
				|  |  |                 lastDataId = dataModelList.get(index -1).getId();
 | 
	
		
			
				|  |  |                 jobService.updateFieldValuetoCurrentTimeOrId(this.flJobConfigId,this.lastDataId,bool);
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     public void initParams(JobDataMap paramsMap){
 | 
	
		
			
				|  |  |         this.flJobConfigId = (Long)paramsMap.get("jobConfig");
 | 
	
		
			
				|  |  |         this.sourceType = String.valueOf(paramsMap.get("sourceType"));
 | 
	
		
			
				|  |  |         this.source = String.valueOf(paramsMap.get("source"));
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         this.flJobConfig = flJobConfigDao.findById(this.flJobConfigId);
 | 
	
		
			
				|  |  |         this.sqlFiledValue = String.valueOf(paramsMap.get("sqlFiledValue"));
 | 
	
		
			
				|  |  |         this.datasource = flJobConfig.getDatasource();
 | 
	
		
			
				|  |  |         this.sqlFiledCondition = paramsMap.getString("sqlFiledCondition");
 | 
	
		
			
				|  |  |         this.sqlFiledValue = flJobConfig.getSqlFieldValue();
 | 
	
		
			
				|  |  |         //没有传增量值,以数据库配置的默认值为查询条件
 | 
	
		
			
				|  |  |         if(StringUtils.isEmpty(this.sqlFiledValue)){
 | 
	
		
			
				|  |  |             this.sqlFiledValue = this.flJobConfig.getSqlFieldValue();
 | 
	
	
		
			
				|  | @ -122,7 +132,7 @@ public class Mysql2ESJob implements Job {
 | 
	
		
			
				|  |  |      * 提取数据,按数据库中配置的增量条件提取
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     public void extract(){
 | 
	
		
			
				|  |  |         this.dataModelList = mysqlExtracter.extractDataByJobConfigsql(this.finalSql);
 | 
	
		
			
				|  |  |         this.dataModelList = mysqlExtracter.extractDataByJobConfigsql(this.finalSql,this.datasource);
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
	
		
			
				|  | @ -200,10 +210,10 @@ public class Mysql2ESJob implements Job {
 | 
	
		
			
				|  |  |     public String getFinalSql(String sql, String sqlFiled, String sqlFiledCondition, String sqlFiledValue) {
 | 
	
		
			
				|  |  |         StringBuilder result = new StringBuilder();
 | 
	
		
			
				|  |  |         result.append(sql).append(" ");
 | 
	
		
			
				|  |  |         if (sqlFiledValue.contains(",") && sql.contains("where")) {
 | 
	
		
			
				|  |  |         if (sqlFiledValue.contains(",") && (sql.contains("where") || sql.contains("WHERE"))) {
 | 
	
		
			
				|  |  |             String[] sqlFiledValues = sqlFiledValue.split(",");
 | 
	
		
			
				|  |  |             result.append("and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValues[1]+"\'").append(" and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValues[0]+"\'");
 | 
	
		
			
				|  |  |         }else if(sql.contains("where")){
 | 
	
		
			
				|  |  |         }else if((sql.contains("where") || sql.contains("WHERE"))){
 | 
	
		
			
				|  |  |             result.append("and ").append(sqlFiled).append(sqlFiledCondition).append("\'"+sqlFiledValue+"\'");
 | 
	
		
			
				|  |  |         }else if (sql.contains("$")){
 | 
	
		
			
				|  |  |             StringBuffer otherCondition = new StringBuffer();
 |