package com.yihu.quota.service.cube; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import com.yihu.ehr.elasticsearch.ElasticSearchPool; import com.yihu.ehr.elasticsearch.ElasticSearchUtil; import com.yihu.ehr.util.datetime.DateUtil; import com.yihu.quota.etl.formula.AgeGroupFunc; import com.yihu.quota.etl.formula.DictFunc; import com.yihu.quota.etl.formula.DivisionFunc; import com.yihu.quota.util.ElasticSearchHandler; import com.yihu.quota.vo.CubeMappingModel; import com.yihu.quota.vo.CubeMemberMappingModel; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.NumberFormat; import java.text.ParseException; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by janseny on 2018/9/18. */ @Service public class ElasticSearchDataProcessService { private static Logger logger = LoggerFactory.getLogger(ElasticSearchDataProcessService.class); private static String dataSource_hbase = "hbase"; private static String dataSource_mysql = "mysql"; private static String action_put = "Put";//添加和修改单个字段值 private static String action_del = "DeleteColumn";//删除单个字段值 private static String action_delFamily = "DeleteFamily";//删除整行 private static String dataSource_k = "dataSource"; private static String table_k = "table"; private static String rowKey_k = "rowkey"; private static String action_k = "action"; @Autowired private ObjectMapper objectMapper; @Autowired private CubeMappingService cubeMappingService; @Autowired private CubeMemberMappingService cubeMemberMappingService; @Autowired private ElasticSearchPool elasticSearchPool; @Autowired private ElasticSearchUtil elasticSearchUtil; /** * * @param data json 数据串 */ public void saveData(String data){ try { Gson gson = new Gson(); Map dataMap = gson.fromJson(data, Map.class); if(dataMap.containsKey(dataSource_k)){ String dataSource = dataMap.get(dataSource_k).toString(); dataMap.remove(dataSource_k); if(dataSource.toLowerCase().equals(dataSource_hbase)){ hbaseDataProcess(dataMap); }else if(dataSource.toLowerCase().equals(dataSource_mysql)){ mysqlDataProcess(dataMap); } } } catch (Exception e) { logger.debug("json数据转换异常"); e.getMessage(); } } /** * @param dataMap */ public void hbaseDataProcess(Map dataMap){ Map source = new HashMap<>(); String index = ""; String type = ""; String table = dataMap.get(table_k).toString(); //通过表找到 对应的数据集 保存的索引和type //TODO 可以维护到数据字典 - 保存到redis 减少去数据库里面查询 String rowKey = dataMap.get(rowKey_k).toString(); String action = dataMap.get(action_k).toString(); dataMap.remove(table_k); dataMap.remove(rowKey_k); dataMap.remove(action_k); try { if(action.contains(action_put)){ String keyValue = ""; for(String key : dataMap.keySet()){ if(dataMap.get(key)!= null){ keyValue = dataMap.get(key).toString(); } //根据列名 查找出 对应的维度code及是否要数据字典,是否通过算法扩展出来 // 是否是子集模式中 List cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, key); if(cubeMappingModels != null && cubeMappingModels.size() > 0){ for(CubeMappingModel cubeMappingModel :cubeMappingModels){ String cloumnCode = cubeMappingModel.getDimensionCode(); DictFunc dictFunc = new DictFunc(); //字典扩展 if(StringUtils.isNotEmpty(cubeMappingModel.getDict()) && StringUtils.isEmpty(cubeMappingModel.getAlgorithm())){ String param[] = {cubeMappingModel.getDict(),keyValue}; String value = dictFunc.execute(param); String dictCode = cloumnCode + ".Code"; String dictName = cloumnCode + ".Name"; source.put(dictCode,key); source.put(dictName,value); }else if(StringUtils.isNotEmpty(cubeMappingModel.getAlgorithm())){ //计算后 又经过字典 如:年龄段 if(cubeMappingModel.getAlgorithm().equals("AgeGroupFunc") && StringUtils.isNotEmpty(cubeMappingModel.getParm())){ AgeGroupFunc ageGroupFunc = new AgeGroupFunc(); String ageGroup = ageGroupFunc.execute(keyValue); String param[] = {cubeMappingModel.getDict(),ageGroup}; String value = dictFunc.execute(param); if(StringUtils.isNotEmpty(cubeMappingModel.getDict())){ String dictCode = cloumnCode + ".Code"; String dictName = cloumnCode + ".Name"; source.put(dictCode,key); source.put(dictName,value); } } //其他具体维度 对应具体算法 }else { if(StringUtils.isNotEmpty(cubeMappingModel.getDataType())){ String dataType = cubeMappingModel.getDataType(); Object value = dataConver(dataType,keyValue); source.put(cloumnCode,value); }else{ source.put(cloumnCode,dataMap.get(key)); } } } } List cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,key); if(cubeMemberMappingModels != null && cubeMemberMappingModels.size() > 0){ for(CubeMemberMappingModel cubeMemberMappingModel :cubeMemberMappingModels){ String cloumnCode = cubeMemberMappingModel.getDimensionCode(); String parentCode = cubeMemberMappingModel.getParentCode(); if(cubeMemberMappingModel.getChildSaveType() != null){ int childSaveType = cubeMemberMappingModel.getChildSaveType(); if(childSaveType == 1 ){//对象方式 //拼装成数据结构 String field = parentCode + "." + cloumnCode; source.put(field,keyValue); } if(childSaveType == 2 ){//nested 方式 //查出历史数据 然后组合保存 String field = parentCode + ".subRowkey"; List> subDataList = elasticSearchUtil.findByField(index, type, field, rowKey); if(subDataList != null && subDataList.size() > 0){ String parentRowkey = subDataList.get(0).get(rowKey_k).toString(); List> dataList = elasticSearchUtil.findByField(index, type, rowKey_k, parentRowkey); if(dataList != null && dataList.size() > 0){ //组装 子集历史数据 在添加 }else { //单条添加 } } } }else { //字典扩展 if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDict())){ DictFunc dictFunc = new DictFunc(); String param[] = {cubeMemberMappingModel.getDict(),cloumnCode}; String value = dictFunc.execute(param); String dictCode = cloumnCode + ".Code"; String dictName = cloumnCode + ".Name"; source.put(dictCode,key); source.put(dictName,value); }else if(StringUtils.isNotEmpty(cubeMemberMappingModel.getAlgorithm())){ if(cubeMemberMappingModel.getAlgorithm().equals("DivisionFunc") && StringUtils.isNotEmpty(cubeMemberMappingModel.getParm())){ if(cubeMemberMappingModel.getParm().equals(key)){ DivisionFunc divisionFunc = new DivisionFunc(); String townParam[] = {dataMap.get(key).toString(),"1"}; String townVal = divisionFunc.execute(townParam); String cityParam[] = {dataMap.get(key).toString(),"2"}; String cityVal = divisionFunc.execute(cityParam); String provinceParam[] = {dataMap.get(key).toString(),"3"}; String provinceVal = divisionFunc.execute(provinceParam); source.put("",townVal); source.put("",cityVal); source.put("",provinceVal); } } }else { if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDataType())){ String dataType = cubeMemberMappingModel.getDataType(); Object value = dataConver(dataType,keyValue); source.put(cloumnCode,value); }else{ source.put(cloumnCode,dataMap.get(key)); } } } } } } elasticSearchUtil.index(index, type, dataMap); }else if (action.contains(action_del)){ for(String key : dataMap.keySet()){ dataMap.put(key,""); } //保存数据 elasticSearchUtil.index(index, type, dataMap); }else if(action.contains(action_delFamily)){ elasticSearchUtil.delete(index,type,rowKey); } }catch (ParseException e){ logger.debug("elasticSearch 执行失败"); e.printStackTrace(); e.getMessage(); } catch (Exception e) { logger.debug("数据解析异常"); e.printStackTrace(); } } /** * @param dataMap */ public Map mysqlDataProcess(Map dataMap){ Map source = new HashMap<>(); String table = dataMap.get(table_k).toString(); String rowKey = dataMap.get(rowKey_k).toString(); //处理 数据库执行动作 增删改 //TODO return source; } /** * 数据类型转换 * @param dataType * @param keyValue */ public Object dataConver(String dataType,String keyValue){ NumberFormat nf = NumberFormat.getInstance(); Object value = null; dataType = dataType.toLowerCase(); if(dataType.equals("string")){ value = keyValue; }else if(dataType.equals("int")){ int intValue = Integer.valueOf(keyValue); value = intValue; }else if(dataType.equals("double")){ nf.setGroupingUsed(false); nf.setMaximumFractionDigits(2); double doubleValue = Double.valueOf(keyValue); value = doubleValue; }else if(dataType.equals("date")){ Date dateValue = DateUtil.formatCharDateYMDHMS(keyValue); value = dateValue; } return value; } }