|
- package com.yihu.quota.service.cube;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.google.gson.Gson;
- import com.yihu.ehr.elasticsearch.ElasticSearchPool;
- import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
- import com.yihu.ehr.util.datetime.DateUtil;
- import com.yihu.quota.etl.formula.AgeGroupFunc;
- import com.yihu.quota.etl.formula.DictFunc;
- import com.yihu.quota.etl.formula.DivisionFunc;
- import com.yihu.quota.util.ElasticSearchHandler;
- import com.yihu.quota.vo.CubeMappingModel;
- import com.yihu.quota.vo.CubeMemberMappingModel;
- import org.apache.commons.lang.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.text.NumberFormat;
- import java.text.ParseException;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- /**
- * Created by janseny on 2018/9/18.
- */
- @Service
- public class ElasticSearchDataProcessService {
- private static Logger logger = LoggerFactory.getLogger(ElasticSearchDataProcessService.class);
- private static String dataSource_hbase = "hbase";
- private static String dataSource_mysql = "mysql";
- private static String action_put = "Put";//添加和修改单个字段值
- private static String action_del = "DeleteColumn";//删除单个字段值
- private static String action_delFamily = "DeleteFamily";//删除整行
- private static String dataSource_k = "dataSource";
- private static String table_k = "table";
- private static String rowKey_k = "rowkey";
- private static String action_k = "action";
- @Autowired
- private ObjectMapper objectMapper;
- @Autowired
- private CubeMappingService cubeMappingService;
- @Autowired
- private CubeMemberMappingService cubeMemberMappingService;
- @Autowired
- private ElasticSearchPool elasticSearchPool;
- @Autowired
- private ElasticSearchUtil elasticSearchUtil;
- /**
- *
- * @param data json 数据串
- */
- public void saveData(String data){
- try {
- Gson gson = new Gson();
- Map<String, Object> dataMap = gson.fromJson(data, Map.class);
- if(dataMap.containsKey(dataSource_k)){
- String dataSource = dataMap.get(dataSource_k).toString();
- dataMap.remove(dataSource_k);
- if(dataSource.toLowerCase().equals(dataSource_hbase)){
- hbaseDataProcess(dataMap);
- }else if(dataSource.toLowerCase().equals(dataSource_mysql)){
- mysqlDataProcess(dataMap);
- }
- }
- } catch (Exception e) {
- logger.debug("json数据转换异常");
- e.getMessage();
- }
- }
- /**
- * @param dataMap
- */
- public void hbaseDataProcess(Map<String, Object> dataMap){
- Map<String, Object> source = new HashMap<>();
- String index = "";
- String type = "";
- String table = dataMap.get(table_k).toString();
- //通过表找到 对应的数据集 保存的索引和type
- //TODO 可以维护到数据字典 - 保存到redis 减少去数据库里面查询
- String rowKey = dataMap.get(rowKey_k).toString();
- String action = dataMap.get(action_k).toString();
- dataMap.remove(table_k);
- dataMap.remove(rowKey_k);
- dataMap.remove(action_k);
- try {
- if(action.contains(action_put)){
- String keyValue = "";
- for(String key : dataMap.keySet()){
- if(dataMap.get(key)!= null){
- keyValue = dataMap.get(key).toString();
- }
- //根据列名 查找出 对应的维度code及是否要数据字典,是否通过算法扩展出来
- // 是否是子集模式中
- List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, key);
- if(cubeMappingModels != null && cubeMappingModels.size() > 0){
- for(CubeMappingModel cubeMappingModel :cubeMappingModels){
- String cloumnCode = cubeMappingModel.getDimensionCode();
- DictFunc dictFunc = new DictFunc();
- //字典扩展
- if(StringUtils.isNotEmpty(cubeMappingModel.getDict()) && StringUtils.isEmpty(cubeMappingModel.getAlgorithm())){
- String param[] = {cubeMappingModel.getDict(),keyValue};
- String value = dictFunc.execute(param);
- String dictCode = cloumnCode + ".Code";
- String dictName = cloumnCode + ".Name";
- source.put(dictCode,key);
- source.put(dictName,value);
- }else if(StringUtils.isNotEmpty(cubeMappingModel.getAlgorithm())){
- //计算后 又经过字典 如:年龄段
- if(cubeMappingModel.getAlgorithm().equals("AgeGroupFunc") && StringUtils.isNotEmpty(cubeMappingModel.getParm())){
- AgeGroupFunc ageGroupFunc = new AgeGroupFunc();
- String ageGroup = ageGroupFunc.execute(keyValue);
- String param[] = {cubeMappingModel.getDict(),ageGroup};
- String value = dictFunc.execute(param);
- if(StringUtils.isNotEmpty(cubeMappingModel.getDict())){
- String dictCode = cloumnCode + ".Code";
- String dictName = cloumnCode + ".Name";
- source.put(dictCode,key);
- source.put(dictName,value);
- }
- }
- //其他具体维度 对应具体算法
- }else {
- if(StringUtils.isNotEmpty(cubeMappingModel.getDataType())){
- String dataType = cubeMappingModel.getDataType();
- Object value = dataConver(dataType,keyValue);
- source.put(cloumnCode,value);
- }else{
- source.put(cloumnCode,dataMap.get(key));
- }
- }
- }
- }
- List<CubeMemberMappingModel> cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,key);
- if(cubeMemberMappingModels != null && cubeMemberMappingModels.size() > 0){
- for(CubeMemberMappingModel cubeMemberMappingModel :cubeMemberMappingModels){
- String cloumnCode = cubeMemberMappingModel.getDimensionCode();
- String parentCode = cubeMemberMappingModel.getParentCode();
- if(cubeMemberMappingModel.getChildSaveType() != null){
- int childSaveType = cubeMemberMappingModel.getChildSaveType();
- if(childSaveType == 1 ){//对象方式
- //拼装成数据结构
- String field = parentCode + "." + cloumnCode;
- source.put(field,keyValue);
- }
- if(childSaveType == 2 ){//nested 方式
- //查出历史数据 然后组合保存
- String field = parentCode + ".subRowkey";
- List<Map<String, Object>> subDataList = elasticSearchUtil.findByField(index, type, field, rowKey);
- if(subDataList != null && subDataList.size() > 0){
- String parentRowkey = subDataList.get(0).get(rowKey_k).toString();
- List<Map<String, Object>> dataList = elasticSearchUtil.findByField(index, type, rowKey_k, parentRowkey);
- if(dataList != null && dataList.size() > 0){
- //组装 子集历史数据 在添加
- }else {
- //单条添加
- }
- }
- }
- }else {
- //字典扩展
- if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDict())){
- DictFunc dictFunc = new DictFunc();
- String param[] = {cubeMemberMappingModel.getDict(),cloumnCode};
- String value = dictFunc.execute(param);
- String dictCode = cloumnCode + ".Code";
- String dictName = cloumnCode + ".Name";
- source.put(dictCode,key);
- source.put(dictName,value);
- }else if(StringUtils.isNotEmpty(cubeMemberMappingModel.getAlgorithm())){
- if(cubeMemberMappingModel.getAlgorithm().equals("DivisionFunc") && StringUtils.isNotEmpty(cubeMemberMappingModel.getParm())){
- if(cubeMemberMappingModel.getParm().equals(key)){
- DivisionFunc divisionFunc = new DivisionFunc();
- String townParam[] = {dataMap.get(key).toString(),"1"};
- String townVal = divisionFunc.execute(townParam);
- String cityParam[] = {dataMap.get(key).toString(),"2"};
- String cityVal = divisionFunc.execute(cityParam);
- String provinceParam[] = {dataMap.get(key).toString(),"3"};
- String provinceVal = divisionFunc.execute(provinceParam);
- source.put("",townVal);
- source.put("",cityVal);
- source.put("",provinceVal);
- }
- }
- }else {
- if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDataType())){
- String dataType = cubeMemberMappingModel.getDataType();
- Object value = dataConver(dataType,keyValue);
- source.put(cloumnCode,value);
- }else{
- source.put(cloumnCode,dataMap.get(key));
- }
- }
- }
- }
- }
- }
- elasticSearchUtil.index(index, type, dataMap);
- }else if (action.contains(action_del)){
- for(String key : dataMap.keySet()){
- dataMap.put(key,"");
- }
- //保存数据
- elasticSearchUtil.index(index, type, dataMap);
- }else if(action.contains(action_delFamily)){
- elasticSearchUtil.delete(index,type,rowKey);
- }
- }catch (ParseException e){
- logger.debug("elasticSearch 执行失败");
- e.printStackTrace();
- e.getMessage();
- } catch (Exception e) {
- logger.debug("数据解析异常");
- e.printStackTrace();
- }
- }
- /**
- * @param dataMap
- */
- public Map<String, Object> mysqlDataProcess(Map<String, Object> dataMap){
- Map<String, Object> source = new HashMap<>();
- String table = dataMap.get(table_k).toString();
- String rowKey = dataMap.get(rowKey_k).toString();
- //处理 数据库执行动作 增删改
- //TODO
- return source;
- }
- /**
- * 数据类型转换
- * @param dataType
- * @param keyValue
- */
- public Object dataConver(String dataType,String keyValue){
- NumberFormat nf = NumberFormat.getInstance();
- Object value = null;
- dataType = dataType.toLowerCase();
- if(dataType.equals("string")){
- value = keyValue;
- }else if(dataType.equals("int")){
- int intValue = Integer.valueOf(keyValue);
- value = intValue;
- }else if(dataType.equals("double")){
- nf.setGroupingUsed(false);
- nf.setMaximumFractionDigits(2);
- double doubleValue = Double.valueOf(keyValue);
- value = doubleValue;
- }else if(dataType.equals("date")){
- Date dateValue = DateUtil.formatCharDateYMDHMS(keyValue);
- value = dateValue;
- }
- return value;
- }
- }
|