Sfoglia il codice sorgente

数据解析 增加 数据关联查询

jkzlzhoujie 6 anni fa
parent
commit
d2f06bb7ec

+ 0 - 53
src/main/java/com/yihu/quota/dao/cube/JdbcBasicDao.java

@ -1,53 +0,0 @@
package com.yihu.quota.dao.cube;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.sql.*;
/**
 * Created by janseny on 2018/10/29.
 */
@Configuration
@Component
public class JdbcBasicDao {
    @Value("${spring.datasource.url}")
    private String datasourceUrl;
    @Value("${spring.datasource.username}")
    private String datasourceUsername;
    @Value("${spring.datasource.password}")
    private String datasourcePassword;
    public String getConnection(String sql ,String database,String cloumnCode) throws Exception{
        try {
            String URL = datasourceUrl;
            String oldDatabase = datasourceUrl.substring(datasourceUrl.lastIndexOf("/")+1,datasourceUrl.indexOf("?"));
            URL = URL.replaceAll(oldDatabase, database);
            //1.加载驱动程序
            Class.forName("com.mysql.jdbc.Driver");
            //2.获得数据库链接
            Connection conn = DriverManager.getConnection(URL, datasourceUsername, datasourcePassword);
            //3.通过数据库的连接操作数据库,实现增删改查(使用Statement类)
            Statement st = conn.createStatement();
            ResultSet rs = st.executeQuery(sql);
            //4.处理数据库的返回结果(使用ResultSet类)
            while(rs.next()){
                return rs.getString(cloumnCode);
            }
            //关闭资源
            rs.close();
            st.close();
            conn.close();
        }catch (SQLException sqlException){
            sqlException.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }
}

+ 0 - 1
src/main/java/com/yihu/quota/kafka/ConsumerListener.java

@ -15,7 +15,6 @@ public class ConsumerListener {
    @KafkaListener(topics = "sep-hbase-data")
    public void loadData(ConsumerRecord<?, ?> record) {
        System.out.println("kafka data: " + record.key() + " - " + record.value());
        if(record.value() != null){
            elasticSearchDataProcessService.saveData(record.value().toString());
        }

+ 5 - 13
src/main/java/com/yihu/quota/kafka/Producer.java

@ -4,6 +4,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@ -20,24 +22,14 @@ import java.util.Map;
 * @date 2018/9/14
 */
@Component
@Configuration
public class Producer {
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    public static String sepTopic = "sep-hbase-data";
    @Value("${kafka.broker.address}")
    private String kafkaBrokerAddress;
    public ProducerFactory<String, String> producerFactory() {
        String kafkaBrokerAddress = "";
        try {
            Yaml yaml = new Yaml();
            URL url = Producer.class.getClassLoader().getResource("application.yml");
            if (url != null) {
                Map map = (Map) yaml.load(new FileInputStream(url.getFile()));
                Map map2 = (Map) map.get("kafka");
                Map map3 = (Map) map2.get("broker");
                kafkaBrokerAddress = map3.get("address").toString();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);

+ 1 - 1
src/main/java/com/yihu/quota/service/cube/ElasticSearchDataProcessService.java

@ -33,7 +33,7 @@ public class ElasticSearchDataProcessService {
    private static String dataSource_mysql = "mysql";
    private static String action_del = "DeleteFamily";//删除整行
    private static String action_put = "Put";                //添加和修改单个字段值
    private static String action_delAll = "delAll";
    private static String action_delAll = "DelAll";
    private static String dataSource_k = "dataSource";
    private static String database_k = "database";

+ 14 - 5
src/main/java/com/yihu/quota/service/cube/JdbcBasicService.java

@ -1,10 +1,13 @@
package com.yihu.quota.service.cube;
import com.yihu.quota.dao.cube.JdbcBasicDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * Created by janseny on 2018/10/29.
@ -13,17 +16,23 @@ import java.sql.*;
public class JdbcBasicService {
    @Autowired
    private JdbcBasicDao jdbcBasicDao;
    private JdbcTemplate jdbcTemplate;
    public String getEntityByRelationId(String database ,String table, String cloumnCode ,String relationCode,String relationColDataType ,String relationId) throws Exception {
        String sql = "";
        Map<String, Object> map = new HashMap<>();
        if(relationColDataType.toLowerCase().equals("string")){
            sql = " SELECT " + cloumnCode + " FROM " + table + " WHERE " + relationCode + "= '" + relationId + "' ";
            sql = " SELECT " + cloumnCode + " FROM " + database + "." + table + " WHERE " + relationCode + "= '" + relationId + "' ";
        }else {
            sql = " SELECT " + cloumnCode + " FROM " + table + " WHERE " + relationCode + "= " + relationId;
            sql = " SELECT " + cloumnCode + " FROM " + database + "." + table + " WHERE " + relationCode + "= " + relationId;
        }
        List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
        if(list != null && list.size() > 0){
            map = list.get(0);
        }
        return jdbcBasicDao.getConnection(sql,database,cloumnCode);
        return map.get(cloumnCode).toString();
    }
}

+ 11 - 3
src/main/java/com/yihu/quota/service/job/SingleTableJob.java

@ -158,9 +158,10 @@ public class SingleTableJob implements Job {
    private void cleanData() {
        if (JobConstant.ExecType.Full.equals(execType)) {
            Map<String, Object> dataMap = new HashMap<>(2);
            dataMap.put("dataSource", "mysql");
            dataMap.put("database", database);
            dataMap.put("table", table);
            dataMap.put("action", "delAll");
            dataMap.put("action", "DelAll");
            dataMap.put("cubeId", cubeId);
            Gson gson = new Gson();
@ -179,10 +180,12 @@ public class SingleTableJob implements Job {
        list.forEach(item -> {
            Map<String, Object> dataMap = new HashMap<>(item.size());
            dataMap.put("database", database);
            dataMap.put("dataSource", "mysql");
            dataMap.put("action", "Put");
            dataMap.put("table", table);
            item.forEach((key, value) -> {
                if (key.equals(primeKey)) {
                    dataMap.put("rowKey", value);
                    dataMap.put("rowkey", value);
                }
                dataMap.put(key, value);
@ -192,7 +195,12 @@ public class SingleTableJob implements Job {
            Gson gson = new Gson();
            String jsonData = gson.toJson(dataMap);
            logger.info("保存消息:{}",jsonData);
            producer.sendMessage(Producer.sepTopic, jsonData);
            try {
                Thread.sleep(50);
                producer.sendMessage(Producer.sepTopic, jsonData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }