| 
					
				 | 
			
			
				@ -1,10 +1,10 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				package com.yihu.quota.service.job; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import com.google.gson.Gson; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import com.yihu.ehr.util.datetime.DateUtil; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import com.yihu.quota.contants.JobConstant; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import com.yihu.quota.kafka.Producer; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import com.yihu.quota.util.sql.DbKit; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import org.apache.commons.lang.StringUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import org.quartz.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import org.slf4j.Logger; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import org.slf4j.LoggerFactory; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -14,7 +14,6 @@ import org.springframework.jdbc.core.JdbcTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import org.springframework.stereotype.Component; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import org.springframework.web.context.support.SpringBeanAutowiringSupport; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import java.util.Date; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import java.util.HashMap; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import java.util.List; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				import java.util.Map; 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -36,6 +35,11 @@ import java.util.Map; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				public class SingleTableJob implements Job { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    static private Logger logger = LoggerFactory.getLogger(SingleTableJob.class); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     * 数据来源库 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    protected String database; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     * 数据来源表 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     */ 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -75,6 +79,16 @@ public class SingleTableJob implements Job { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    protected JobConstant.ExecType execType; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     * 查询列 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    protected String searchColumn; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     * 数据集id 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    protected String cubeId; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    private Producer producer; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    @Autowired 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -87,11 +101,8 @@ public class SingleTableJob implements Job { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        cleanData(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        List<Map<String, Object>> list; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        do { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            list = fetch(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            saveData(list); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        } while (list != null && list.size() != 0); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        list = fetch(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        saveData(list); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    private void prepare(JobExecutionContext jobExecutionContext) { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -99,6 +110,7 @@ public class SingleTableJob implements Job { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        database = jobDataMap.getString("database"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        table = jobDataMap.getString("table"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        primeKey = jobDataMap.getString("primeKey"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        filterField = jobDataMap.getString("filterField"); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -106,16 +118,22 @@ public class SingleTableJob implements Job { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        size = jobDataMap.getString("size"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        start = jobDataMap.getString("start"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        end = jobDataMap.getString("end"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        execType = JobConstant.ExecType.fromInt(jobDataMap.getIntValue("execType")); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        searchColumn = jobDataMap.getString("searchColumn"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        cubeId = jobDataMap.getString("cubeId"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    private void cleanData() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        if (JobConstant.ExecType.Full.equals(execType)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            Map<String, Object> dataMap = new HashMap<>(2); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            dataMap.put("database", database); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            dataMap.put("table", table); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            dataMap.put("delAll", true); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            dataMap.put("action", "delAll"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            dataMap.put("cubeId", cubeId); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            Gson gson = new Gson(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            String jsonData = gson.toJson(dataMap); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            logger.info("清除消息:{}",jsonData); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            producer.sendMessage(Producer.sepTopic, jsonData); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    } 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -128,6 +146,7 @@ public class SingleTableJob implements Job { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        list.forEach(item -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            Map<String, Object> dataMap = new HashMap<>(item.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            dataMap.put("database", database); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            dataMap.put("table", table); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            item.forEach((key, value) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                if (key.equals(primeKey)) { 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -140,18 +159,55 @@ public class SingleTableJob implements Job { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            Gson gson = new Gson(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            String jsonData = gson.toJson(dataMap); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            logger.info("保存消息:{}",jsonData); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            producer.sendMessage(Producer.sepTopic, jsonData); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    private List<Map<String, Object>> fetch() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        StringBuilder sb = new StringBuilder(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        if (StringUtils.isNotEmpty(searchColumn)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            sb.append("select ").append(primeKey).append(",").append(searchColumn).append(" from ").append(database).append(".").append(table); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            sb.append("select * from ").append(database).append(".").append(table); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        if (StringUtils.isNotEmpty(filterField) && (StringUtils.isNotEmpty(start) || StringUtils.isNotEmpty(end))) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            sb.append(" where "); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            if ("number".equals(filterFieldType)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                if (StringUtils.isNotEmpty(start) && StringUtils.isNotEmpty(end)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                    sb.append(filterField).append(">=").append(start).append(" and ").append(filterField).append("<=").append(end); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                } else if (StringUtils.isNotEmpty(start) && StringUtils.isEmpty(end)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                    sb.append(filterField).append(">=").append(start); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                } else if (StringUtils.isEmpty(start) && StringUtils.isNotEmpty(end)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                    sb.append(filterField).append("<=").append(end); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            } else if ("date".equals(filterFieldType)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                if (StringUtils.isNotEmpty(start) && StringUtils.isNotEmpty(end)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                    sb.append(filterField).append(">=").append(DbKit.use().getLongDate(start)).append(" and ") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                            .append(filterField).append("<=").append(DbKit.use().getLongDate(end)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                } else if (StringUtils.isNotEmpty(start) && StringUtils.isEmpty(end)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                    sb.append(filterField).append(">=").append(DbKit.use().getLongDate(start)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                } else if (StringUtils.isEmpty(start) && StringUtils.isNotEmpty(end)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                    sb.append(filterField).append("<=").append(DbKit.use().getLongDate(end)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                logger.warn("不支持的过滤字段类型"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                return null; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        logger.info("sql={}",sb.toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        return jdbcTemplate.queryForList(sb.toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    /** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     * TODO:没有设置数据库来源和数据库类型,当前使用默认数据库 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     * @return 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    private List<Map<String, Object>> fetch() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    /*private List<Map<String, Object>> fetch() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        String sql = ""; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        if ("number".equals(filterFieldType)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				            Long lngTemp = Long.parseLong(start) + Long.parseLong(size); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@ -181,5 +237,5 @@ public class SingleTableJob implements Job { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				        return jdbcTemplate.queryForList(sql); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				    }*/ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				} 
			 |