package com.yihu.quota.service.cube; import com.fasterxml.jackson.databind.ObjectMapper; import com.yihu.ehr.elasticsearch.ElasticSearchUtil; import com.yihu.ehr.util.datetime.DateUtil; import com.yihu.quota.etl.formula.DictFunc; import com.yihu.quota.etl.formula.RelevanceFunc; import com.yihu.quota.model.cube.Cube; import com.yihu.quota.service.dimension.DimensionService; import com.yihu.quota.vo.CubeMappingModel; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.NumberFormat; import java.text.ParseException; import java.util.*; /** * Created by janseny on 2018/9/18. */ @Service public class ElasticSearchDataProcessService { private static Logger logger = LoggerFactory.getLogger(ElasticSearchDataProcessService.class); private static String dataSource_hbase = "hbase"; private static String dataSource_mysql = "mysql"; private static String action_del = "DeleteFamily";//删除整行 private static String action_put = "Put"; //添加和修改单个字段值 private static String action_delAll = "DelAll"; //删除整张表数据 private static String action_putAll = "PutAll"; //添加整张表数据 private static String dataSource_k = "dataSource"; private static String database_k = "database"; private static String table_k = "table"; private static String id_k = "_id"; private static String cubeId_k = "cubeId"; private static String rowKey_k = "rowkey"; private static String profileId_k = "profile_id"; private static String action_k = "action"; @Autowired private CubeMappingService cubeMappingService; @Autowired private ElasticSearchUtil elasticSearchUtil; @Autowired private CubeService cubeService; @Autowired private ObjectMapper objectMapper; @Autowired private RelevanceFunc relevanceFunc; /** * * @param data json 数据串 */ public void saveData(String data){ try { Map dataMap = objectMapper.readValue(data,Map.class); if(dataMap.containsKey(dataSource_k)){ String dataSource = dataMap.get(dataSource_k).toString(); dataMap.remove(dataSource_k); if(dataSource.toLowerCase().equals(dataSource_hbase)){ dataProcess(dataMap); }else if(dataSource.toLowerCase().equals(dataSource_mysql)){ dataProcess(dataMap); } } } catch (Exception e) { logger.debug("json数据转换异常"); e.getMessage(); } } /** * @param dataMap * 如果是子集的数据 hbase 过来要指定父级数据 * 如果是 删除 子表中的一行数据,es 这边就无法确定删除哪个数据 */ public void dataProcess(Map dataMap) throws Exception{ String table = ""; String rowKey = ""; String profileId = ""; String cubeId = ""; String action = ""; String database = ""; if (dataMap.containsKey(database_k)) { database = dataMap.remove(database_k).toString(); dataMap.remove(database_k); } if(dataMap.containsKey(table_k)){ table = dataMap.get(table_k).toString(); dataMap.remove(table_k); } if(dataMap.containsKey(rowKey_k)){ rowKey = dataMap.get(rowKey_k).toString(); dataMap.remove(rowKey_k); } if(dataMap.containsKey(profileId_k)){ profileId = dataMap.get(profileId_k).toString(); dataMap.remove(profileId_k); } if(dataMap.containsKey(action_k)){ action = dataMap.get(action_k).toString(); dataMap.remove(action_k); } if(dataMap.containsKey(cubeId_k)){ cubeId = dataMap.get(cubeId_k).toString(); } if(StringUtils.isEmpty(profileId)){ profileId = rowKey; } String subRowKey = rowKey; try { String baseCloumnValue = null; if(action.equals(action_put)){ for(String baseCloumnCode : dataMap.keySet()){ if(dataMap.get(baseCloumnCode)!= null){ baseCloumnValue = dataMap.get(baseCloumnCode).toString(); List cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode); if(cubeMappingModels != null && cubeMappingModels.size() > 0){ for(CubeMappingModel cubeMappingModel :cubeMappingModels){ // System.out.println("维度code = " + cubeMappingModel.getDimensionCode() + ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue ); Map source = new HashMap<>(); if(cubeMappingModel.getParentId() == null){ source.put(id_k,rowKey); source.put(rowKey_k,rowKey); source.putAll(dimensionDataExtendToMap(cubeMappingModel,cubeMappingModel.getDimensionCode(),baseCloumnValue)); }else { // 子集成员 rowKey = profileId; source.put(id_k,profileId); source.put(rowKey_k,profileId); CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId()); String parentCode = mappingModel.getDimensionCode(); cubeMappingModel.setParentCode(parentCode); cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType()); source.putAll(dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId)); } String index = cubeMappingModel.getIndexName(); String type = cubeMappingModel.getIndexType(); saveElasticSearchData(index, type,rowKey,source); } } } } }else if(action.equals(action_del)){ //一个表只能对应到一个 索引type Cube cube = cubeMappingService.findCubeByTableCode(table); if(cube != null){ elasticSearchUtil.delete(cube.getIndexName(),cube.getIndexType(),rowKey); }else { throw new Exception("视图,表不存在"); } }else if (action.equals(action_putAll)) { if (dataMap.containsKey("cubeId")) { //采用redis Cube cube = cubeService.findOne(Integer.parseInt(cubeId)); if (null != cube) { String index = cube.getIndexName(); String type = cube.getIndexType(); List> dataList = (List>)dataMap.get("dataList"); List> sourcesList = new ArrayList<>(); for(Map oneDataMap : dataList){ Map source = new HashMap<>(); for(String baseCloumnCode : oneDataMap.keySet()){ source.put(id_k, oneDataMap.get(rowKey_k).toString()); source.put(rowKey_k,oneDataMap.get(rowKey_k).toString()); // System.out.println("列:" + baseCloumnCode ); if(oneDataMap.get(baseCloumnCode)!= null){ baseCloumnValue = oneDataMap.get(baseCloumnCode).toString(); //这个 可以采用保存到redis 方式减少数据库压力 List cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode); if(cubeMappingModels != null && cubeMappingModels.size() > 0){ for(CubeMappingModel cubeMappingModel :cubeMappingModels){ // System.out.println("维度code = " + cubeMappingModel.getDimensionCode() + ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue ); if(cubeMappingModel.getParentId() == null){ source.putAll(dimensionDataExtendToMap(cubeMappingModel, cubeMappingModel.getDimensionCode(), baseCloumnValue)); }else { // 子集成员 //这个 可以采用保存到redis 方式减少数据库压力 CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId()); String parentCode = mappingModel.getDimensionCode(); cubeMappingModel.setParentCode(parentCode); cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType()); if(mappingModel != null){ Map childMap = new HashMap<>(); childMap = dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId); if(source.get(parentCode) != null ){ Map parentMap = (Map) source.get(parentCode); parentMap.putAll((Map)childMap.get(parentCode)); source.put(parentCode,parentMap); }else { source.put(parentCode,childMap.get(parentCode)); } } } } } } } sourcesList.add(source); } elasticSearchUtil.bulkIndex(index,type,sourcesList); } else { throw new Exception("索引不存在"); } } }else if (action.contains(action_delAll)) { if (dataMap.containsKey("cubeId")) { Cube cube = cubeService.findOne(Integer.parseInt(cubeId)); if (null != cube) { elasticSearchUtil.deleteByField(cube.getIndexName(), cube.getIndexType(), "_index", cube.getIndexName()); } else { throw new Exception("索引不存在"); } } } }catch (ParseException e){ logger.debug("elasticSearch 执行失败"); e.printStackTrace(); e.getMessage(); } catch (Exception e) { logger.debug("数据解析异常"); e.printStackTrace(); } } /** * 维度数据扩展 转map * @param cubeMappingModel * @param cloumnCode * @return */ public Map dimensionDataExtendToMap(CubeMappingModel cubeMappingModel,String cloumnCode,String baseCloumnValue) throws Exception { String dataType = cubeMappingModel.getDataType(); int dataGetType = cubeMappingModel.getDataGetType(); String relationFieldId = cubeMappingModel.getRelationFieldId(); String relationDataFieldId = cubeMappingModel.getRelationDataFieldId(); String dict = cubeMappingModel.getDict(); String algorithm = cubeMappingModel.getAlgorithm(); Map source = new HashMap<>(); //字典扩展 if(StringUtils.isNotEmpty(dict) && StringUtils.isEmpty(algorithm)){ source = extendDictData(source,cloumnCode,dict,baseCloumnValue); }else if(StringUtils.isNotEmpty(algorithm)){ // //通过 反射方式进行 后续开放 暂时字典redis不通 // FuncHelper funcHelper = new FuncHelper(); // algorithmParm = algorithmParm.replace("value",cloumnCode); // Object data = funcHelper.function(algorithm, algorithmParm.split(",")); // if(StringUtils.isNotEmpty(dict)){ // source = extendDictData(source,cloumnCode,dict,data.toString()); // }else{ // source.put(cloumnCode,data); // } }else if(StringUtils.isNotEmpty(relationFieldId) && dataGetType == 3 ) {// 关联获取 String value = relevanceFunc.getRelationFieldVal(relationDataFieldId, relationFieldId, baseCloumnValue); source.put(cloumnCode,value); }else { source.put(cloumnCode,dataConver(dataType,baseCloumnValue)); } return source; } /** * 维度成员数据扩展 转map * @param cubeMappingModel * @param baseCloumnValue * @param subRowKey * @return */ public Map dimensionMemberDataExtendToMap(CubeMappingModel cubeMappingModel,String baseCloumnValue,String subRowKey,String profileId) throws Exception { try { int parentId = cubeMappingModel.getParentId(); int childSaveType = 1; if(cubeMappingModel.getChildSaveType() != null ){ childSaveType = cubeMappingModel.getChildSaveType(); } String parentCode = cubeMappingModel.getParentCode(); Map source = new HashMap<>(); String cloumnCode = cubeMappingModel.getDimensionCode(); String dataType = cubeMappingModel.getDataType(); String dict = cubeMappingModel.getDict(); String algorithm = cubeMappingModel.getAlgorithm(); String algorithmParm = cubeMappingModel.getParm(); String relationFieldId = cubeMappingModel.getRelationFieldId(); int dataGetType = cubeMappingModel.getDataGetType(); String index = cubeMappingModel.getIndexName(); String type = cubeMappingModel.getIndexType(); if(childSaveType == 1 ){//对象方式 Map objChildMap = new HashMap<>(); if(StringUtils.isNotEmpty(dict)){ objChildMap = extendDictData(objChildMap,cloumnCode,dict,baseCloumnValue); source.put(parentCode,objChildMap); }else if(StringUtils.isNotEmpty(algorithm) && dataGetType == 2){//算法获取 // //通过 反射方式进行 后续开放 暂时字典redis不通 // FuncHelper funcHelper = new FuncHelper(); // algorithmParm = algorithmParm.replace("value",cloumnCode); // Object data = funcHelper.function(algorithm, algorithmParm.split(",")); // if(StringUtils.isNotEmpty(dict)){ // objChildMap = extendDictData(objChildMap,cloumnCode,dict,data.toString()); // }else{ // objChildMap.put(cloumnCode,data); // } source.put(parentCode,objChildMap); }else if(StringUtils.isNotEmpty(relationFieldId) && dataGetType == 3 ) {// 关联获取 String value = relevanceFunc.getRelationFieldVal(cubeMappingModel.getRelationDataFieldId(), relationFieldId, baseCloumnValue); objChildMap.put(cloumnCode,dataConver(cubeMappingModel.getDataType(),value)); // 是否存在二级关联 List secondMappingList = cubeMappingService.findSencodRelationDimension(cubeMappingModel.getCubeId(), cubeMappingModel.getId()); if(secondMappingList != null && secondMappingList.size() > 0){ for(CubeMappingModel cubeMappingSecond : secondMappingList){ String secondRelationValue = relevanceFunc.getRelationFieldVal(cubeMappingSecond.getRelationDataFieldId(), cubeMappingSecond.getRelationFieldId(), value); objChildMap.put(cubeMappingSecond.getDimensionCode(),dataConver(cubeMappingSecond.getDataType(),secondRelationValue)); } } }else { objChildMap.put(cloumnCode,dataConver(dataType,baseCloumnValue)); } source.put(parentCode,objChildMap); }else if(childSaveType == 2 ){//nested 方式 CubeMappingModel cubeMappingModelPri = cubeMappingService.findCubeMappingPrimary(parentId); String primaryKeyCode = ""; if(cubeMappingModelPri != null ){ primaryKeyCode = cubeMappingModelPri.getDimensionCode(); }else { return null; } List> nestedList = new ArrayList<>(); //查出历史数据 然后组合保存 Map oldMataMap = elasticSearchUtil.findById(index, type, profileId); if(oldMataMap != null && oldMataMap.size() > 0){ //组装 子集历史数据,更改当前字段值 在添加 List> childList = (List>)oldMataMap.get(parentCode); if(childList != null && childList.size() > 0){ boolean isexist = false; for(Map map : childList){ if(subRowKey.equals(map.get(primaryKeyCode).toString())){ map.put(primaryKeyCode,subRowKey); map.put(cloumnCode,dataConver(dataType,baseCloumnValue)); if(StringUtils.isNotEmpty(dict)){ map = extendDictData(map,cloumnCode,dict,baseCloumnValue); }else if(StringUtils.isNotEmpty(algorithm)){ //其他算法 反射方式 todo } isexist = true; } nestedList.add(map); } if( !isexist){ Map map = new HashMap<>(); map.put(primaryKeyCode,subRowKey); map.put(cloumnCode,dataConver(dataType,baseCloumnValue)); if(StringUtils.isNotEmpty(dict)){ map = extendDictData(map,cloumnCode,dict,baseCloumnValue); }else if(StringUtils.isNotEmpty(algorithm)){ //其他算法 } nestedList.add(map); } source.put(parentCode, nestedList); }else{ Map map = new HashMap<>(); map.put(primaryKeyCode,subRowKey); map.put(cloumnCode,dataConver(dataType,baseCloumnValue)); nestedList.add(map); source.put(parentCode, nestedList); } }else{ throw new Exception("没有找到数据,无法更新"); } } return source; }catch (Exception e){ throw e; } } /** * 字典数据扩展 * @param source 数据集合 * @param cloumnCode 列code * @param dict 字典ID * @param code 字典编码 * @return */ public Map extendDictData(Map source ,String cloumnCode,String dict,String code){ DictFunc dictFunc = new DictFunc(); String value = ""; String param[] = {dict,code}; // value = dictFunc.execute(param); value = cloumnCode + "测试值"; source.put(cloumnCode,code); source.put(cloumnCode + "_name",value); return source; } // /** // * 年龄扩展年龄段 及年龄段字典 // * @param source 数据集合 // * @param cloumnCode 列code // * @param dict 字典ID // * @param sourceValue 年龄 // * @return // */ // public Map extendAgeGroupData(Map source ,String cloumnCode,String dict,String sourceValue){ // String value = ""; // DictFunc dictFunc = new DictFunc(); // AgeGroupFunc ageGroupFunc = new AgeGroupFunc(); // String ageGroup = ageGroupFunc.execute(Integer.valueOf(sourceValue)); // String param[] = {dict,ageGroup}; //// value = dictFunc.execute(param); // value = "年龄段=" + ageGroup; // if(StringUtils.isNotEmpty(dict)){ // source.put(cloumnCode,ageGroup); // source.put(cloumnCode + "Name",value); // } // return source; // } // /** // * 区域扩展 // * @param source 数据集合 // * @param cloumnCode 列code // * @param algorithmParm 算法参数 // * @param sourceValue 年龄 // * @return // */ // public Map extendDivisionData(Map source ,String cloumnCode,String algorithmParm,String sourceValue){ // DivisionFunc divisionFunc = new DivisionFunc(); // String level = "1"; // String divisionVal = ""; // if(cloumnCode.toLowerCase().equals("town")){ // level = "1"; // divisionVal = "婺源县"; // }else if(cloumnCode.toLowerCase().equals("city")){ // level = "2"; // divisionVal = "上饶市"; // }else if(cloumnCode.toLowerCase().equals("province")){ // level = "3"; // divisionVal = "江西省"; // } // String divisionParam[] = {sourceValue,level}; //// divisionVal = divisionFunc.execute(divisionParam); // source.put(cloumnCode,divisionVal); // return source; // } /** * 添加修改 数据 * @param index * @param type * @param rowKey * @param source * @throws ParseException */ public void saveElasticSearchData(String index,String type,String rowKey,Map source) throws ParseException { Map data = elasticSearchUtil.findById(index, type, rowKey); if(data != null){ elasticSearchUtil.update(index, type,rowKey,source); }else { source.put("extra_total","1"); elasticSearchUtil.index(index, type,source); } } /** * 数据类型转换 * @param dataType * @param keyValue */ public Object dataConver(String dataType,String keyValue) throws Exception{ Object value = null; NumberFormat nf = NumberFormat.getInstance(); try { if(StringUtils.isNotEmpty(keyValue)){ dataType = dataType.toLowerCase(); if(dataType.equals("string")){ value = keyValue; }else if(dataType.equals("integer")){ int intValue = Integer.valueOf(keyValue); value = intValue; }else if(dataType.equals("double")){ nf.setGroupingUsed(false); nf.setMaximumFractionDigits(2); double doubleValue = Double.valueOf(keyValue); value = doubleValue; }else if(dataType.equals("date")){ Date dateValue = null; if(keyValue.length() > 10){ if(keyValue.contains("-") ){ //2016-08-04T00:00:00Z+0800 keyValue = keyValue.substring(0,19); keyValue = keyValue.replace('T',' '); dateValue = DateUtil.parseDate(keyValue, DateUtil.DEFAULT_YMDHMSDATE_FORMAT); }else { //时间戳 1531130451000 dateValue = DateUtil.toDateFromTime(keyValue); } }else { dateValue = DateUtil.parseDate(keyValue, DateUtil.DEFAULT_DATE_YMD_FORMAT); } //es 保存是少8小时 Calendar ca = Calendar.getInstance(); ca.setTime(dateValue); ca.add(Calendar.HOUR_OF_DAY, 8); value = ca.getTime(); } } }catch (Exception e){ e.printStackTrace(); logger.debug("数据转换异常!"); } return value; } public String converMapObject(Object object){ Object[] obj = (Object[]) object; if(obj.length > 0){ return obj[0].toString(); } return ""; } }