瀏覽代碼

实现单表Job数据提取逻辑

Airhead 6 年之前
父節點
當前提交
e45155f678

+ 15 - 8
src/main/java/com/yihu/quota/kafka/Producer.java

@ -21,8 +21,8 @@ import java.util.Map;
 */
@Component
public class Producer {
    private static String topic = "sep-hbase-data";
    Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    public static String sepTopic = "sep-hbase-data";
    public ProducerFactory<String, String> producerFactory() {
        String kafkaBrokerAddress = "";
@ -45,19 +45,26 @@ public class Producer {
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    public String sendMessage(String message) {
    public boolean sendMessage(String topic, String message) {
        try {
            if (logger.isInfoEnabled()) {
                logger.info("send Message success.");
            }
            KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
            logger.info("kafka的消息={}", message);
            kafkaTemplate.send(topic, message);
            logger.info("send Message success.");
            return "SUCCESS";
            return true;
        } catch (Exception e) {
            logger.error("send Message fail.", e);
            return "FAIL";
            if (logger.isErrorEnabled()) {
                logger.error("send Message fail." + "topic:" + topic + ",message:" + message + "error:" + e.getMessage(), e);
            }
            return false;
        }
    }

+ 39 - 7
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -1,8 +1,10 @@
package com.yihu.quota.service.job;
import com.google.gson.Gson;
import com.yihu.ehr.util.datetime.DateUtil;
import com.yihu.quota.contants.JobConstant;
import com.yihu.quota.kafka.Producer;
import com.yihu.quota.util.sql.DbKit;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -12,6 +14,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.support.SpringBeanAutowiringSupport;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -87,6 +90,7 @@ public class SingleTableJob implements Job {
        do {
            list = fetch();
            saveData(list);
        } while (list != null && list.size() != 0);
    }
@ -112,7 +116,7 @@ public class SingleTableJob implements Job {
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            producer.sendMessage(jsonData);
            producer.sendMessage(Producer.sepTopic, jsonData);
        }
    }
@ -136,17 +140,45 @@ public class SingleTableJob implements Job {
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            producer.sendMessage(jsonData);
            producer.sendMessage(Producer.sepTopic, jsonData);
        });
    }
    private List<Map<String, Object>> fetch() {
        //TODO:filterFieldType过滤
        String sql = "select * from " + table +
                " where " + filterField + ">=" + start + " and " + filterField + "<" + (start + size) + " and " +
                filterField + "<=" + end;
    /**
     * TODO:没有设置数据库来源和数据库类型,当前使用默认数据库
     *
     * @return
     */
    private List<Map<String, Object>> fetch() {
        String sql = "";
        if ("number".equals(filterFieldType)) {
            Long lngTemp = Long.parseLong(start) + Long.parseLong(size);
            String temp = lngTemp.toString();
            sql = "select * from " + table +
                    " where " + filterField + ">=" + start + " and " + filterField + "<" + temp + " and " +
                    filterField + "<=" + end;
            start = temp;
        } else if ("date".equals(filterFieldType)) {
            Date date = DateUtil.toDateFromTime(start);
            java.util.Calendar calendar = java.util.Calendar.getInstance();
            calendar.setTime(date);
            calendar.add(java.util.Calendar.HOUR_OF_DAY, Integer.parseInt(size));
            String temp = DateUtil.toString(calendar.getTime(), DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
            sql = "select * from " + table +
                    " where " + filterField + ">=" + DbKit.use().getLongDate(start) + " and " + filterField + "<" + DbKit.use().getLongDate(temp) + " and " +
                    filterField + "<=" + DbKit.use().getLongDate(end);
            start = temp;
        } else {
            logger.warn("不支持的过滤字段类型");
            return null;
        }
        return jdbcTemplate.queryForList(sql);
    }

+ 23 - 0
src/main/java/com/yihu/quota/util/sql/Db.java

@ -0,0 +1,23 @@
package com.yihu.quota.util.sql;
public interface Db {
    String getDate(String value);
    String getLongDate(String value);
    String getNumber(String value);
    /**
     * 支持的数据类型
     */
    public enum Type {
        /**
         * oracle
         */
        oracle,
        /**
         * mysql
         */
        mysql
    }
}

+ 50 - 0
src/main/java/com/yihu/quota/util/sql/DbKit.java

@ -0,0 +1,50 @@
package com.yihu.quota.util.sql;
import java.util.HashMap;
public class DbKit {
    private Db defaultDb;
    private HashMap<Db.Type, Db> mapDB = new HashMap<>();
    private DbKit(Db.Type type) {
        Db db = mapDB.get(type);
        if (db != null) {
            defaultDb = db;
            return;
        }
        switch (type) {
            case mysql:
                db = new MysqlDb();
                break;
            case oracle:
                db = new OracleDb();
                break;
            default:
                break;
        }
        defaultDb = db;
        mapDB.put(type, db);
    }
    public static DbKit use() {
        return use(Db.Type.mysql);
    }
    public static DbKit use(Db.Type type) {
        return new DbKit(type);
    }
    public String getDate(String value) {
        return defaultDb.getDate(value);
    }
    public String getLongDate(String value) {
        return defaultDb.getLongDate(value);
    }
    public String getNumber(String value) {
        return defaultDb.getNumber(value);
    }
}

+ 18 - 0
src/main/java/com/yihu/quota/util/sql/MysqlDb.java

@ -0,0 +1,18 @@
package com.yihu.quota.util.sql;
public class MysqlDb implements Db {
    @Override
    public String getDate(String value) {
        return "str_to_date('" + value + "','%Y-%M-%D')";
    }
    @Override
    public String getLongDate(String value) {
        return "str_to_date('" + value + "','%Y-%m-%d %H:%i:%S')";
    }
    @Override
    public String getNumber(String value) {
        return "cast(" + value + " as signed integer)";
    }
}

+ 18 - 0
src/main/java/com/yihu/quota/util/sql/OracleDb.java

@ -0,0 +1,18 @@
package com.yihu.quota.util.sql;
public class OracleDb implements Db {
    @Override
    public String getDate(String value) {
        return "to_date('" + value + "','YYYY-MM-DD')";
    }
    @Override
    public String getLongDate(String value) {
        return "to_date('" + value + "','YYYY-MM-DD HH24:MI:SS')";
    }
    @Override
    public String getNumber(String value) {
        return "to_number(" + value + ")";
    }
}