Browse Source

Merge branch 'dev-1.13.0' of http://192.168.1.220:10080/EHR/svr-quota into dev-1.13.0

Airhead 6 years ago
parent
commit
c8da11e1b7

+ 8 - 2
src/main/java/com/yihu/quota/kafka/ConsumerListener.java

@ -1,6 +1,8 @@
package com.yihu.quota.kafka;
import com.yihu.quota.service.cube.ElasticSearchMappingService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
/**
@ -8,11 +10,15 @@ import org.springframework.kafka.annotation.KafkaListener;
 * kafka消费监听
 */
public class ConsumerListener {
    @Autowired
    private ElasticSearchMappingService elasticSearchMappingService;
    @KafkaListener(topics = "sep-hbase-data")
    public void loadData(ConsumerRecord<?, ?> record) {
        System.out.println("-- " + record.key() + " - " + record.value());
        System.out.println("kafka data: " + record.key() + " - " + record.value());
        if(record.value() != null){
            elasticSearchMappingService.saveData(record.value().toString());
        }
    }
}

+ 121 - 0
src/main/java/com/yihu/quota/service/cube/ElasticSearchMappingService.java

@ -0,0 +1,121 @@
package com.yihu.quota.service.cube;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.yihu.ehr.elasticsearch.ElasticSearchPool;
import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
import com.yihu.quota.util.ElasticSearchHandler;
import org.elasticsearch.client.transport.TransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
 * Created by janseny on 2018/9/18.
 */
@Service
public class ElasticSearchMappingService {
    private static Logger logger = LoggerFactory.getLogger(ElasticSearchMappingService.class);
    private static String dataSource_hbase = "hbase";
    private static String dataSource_mysql = "mysql";
    private static String action_put = "Put";//添加和修改单个字段值
    private static String action_del = "DeleteColumn";//删除单个字段值
    private static String action_delFamily = "DeleteFamily";//删除整行
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private CubeMappingService cubeMappingService;
    @Autowired
    private CubeService cubeService;
    @Autowired
    private CubeMemberMappingService cubeMemberMappingService;
    @Autowired
    private ElasticSearchPool elasticSearchPool;
    @Autowired
    private ElasticSearchUtil elasticSearchUtil;
    @Autowired
    private ElasticSearchHandler elasticSearchHandler;
    /**
     *
     * @param data json 数据串
     */
    public void saveData(String data){
        try {
            Gson gson = new Gson();
            Map<String, Object> dataMap = gson.fromJson(data, Map.class);
            if(dataMap.containsKey("dataSource")){
                String dataSource = dataMap.get("dataSource").toString();
                dataMap.remove("dataSource");
                if(dataSource.toLowerCase().equals(dataSource_hbase)){
                     hbaseDataProcess(dataMap);
                }else if(dataSource.toLowerCase().equals(dataSource_mysql)){
                     mysqlDataProcess(dataMap);
                }
            }
        } catch (Exception e) {
            logger.debug("json数据转换异常");
            e.getMessage();
        }
    }
    /**
     * @param dataMap
     */
    public void hbaseDataProcess(Map<String, Object> dataMap){
        Map<String, Object> source  = new HashMap<>();
        String index = "";
        String type = "";
        String table = dataMap.get("table").toString();
        //通过表找到 对应的数据集 保存的索引和type
        //TODO 可以维护到数据字典 - 保存到redis 减少去数据库里面查询
        String rowKey = dataMap.get("rowKey").toString();
        String action = dataMap.get("action").toString();
        dataMap.remove("table");
        dataMap.remove("rowKey");
        dataMap.remove("action");
        try {
            if(action.contains(action_put)){
                //保存数据
                elasticSearchUtil.index(index, type, dataMap);
            }else if (action.contains(action_del)){
                for(String key : dataMap.keySet()){
                    dataMap.put(key,"");
                }
                //保存数据
               elasticSearchUtil.index(index, type, dataMap);
            }else if(action.contains(action_delFamily)){
                elasticSearchUtil.delete(index,type,rowKey);
            }
        }catch (ParseException e){
            logger.debug("elasticSearch 执行失败");
            e.printStackTrace();
            e.getMessage();
        }
    }
    /**
     * @param dataMap
     */
    public Map<String, Object>  mysqlDataProcess(Map<String, Object> dataMap){
        Map<String, Object> source  = new HashMap<>();
        String table = dataMap.get("table").toString();
        String rowKey = dataMap.get("rowKey").toString();
        //处理 数据库执行动作 增删改
        //TODO
        return source;
    }
}

+ 3 - 7
src/main/java/com/yihu/quota/util/ElasticSearchHandler.java

@ -169,13 +169,11 @@ public class ElasticSearchHandler {
                        mapping.startObject(field).field("type", "string").field("index", "not_analyzed").endObject();
                    }else if(participle == 2) {
                        mapping.startObject(field).field("type", "string").endObject();
                    }else {
                        mapping.startObject(field).field("type", "keyword").field("index", "not_analyzed").endObject();
                    }
                }else if("date".equals(dataType)){
                    mapping.startObject(field).field("type", dataType).field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject();
                    mapping.startObject(field).field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject();
                }else if("nested".equals(dataType)){//子集属性
                    mapping.startObject(field).startObject("properties");
                    mapping.startObject(field).field("type", "nested").startObject("properties");
                    //子属性
                    List<FieldInfo> childFieldList = info.getFieldInfos();
                    for(FieldInfo child : childFieldList){
@ -187,11 +185,9 @@ public class ElasticSearchHandler {
                                mapping.startObject(childField).field("type", "string").field("index", "not_analyzed").endObject();
                            }else if(childParticiple == 2) {
                                mapping.startObject(childField).field("type", "string").endObject();
                            }else {
                                mapping.startObject(childField).field("type", "keyword").field("index", "not_analyzed").endObject();
                            }
                        }else if("date".equals(dataType)){
                            mapping.startObject(childField).field("type", childDataType).field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject();
                            mapping.startObject(childField).field("type", "date").field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis").endObject();
                        }else {
                            mapping.startObject(childField) .field("type", childDataType).endObject();
                        }