123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- package com.yihu.quota.service.cube;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
- import com.yihu.ehr.util.datetime.DateUtil;
- import com.yihu.quota.etl.formula.DictFunc;
- import com.yihu.quota.etl.formula.RelevanceFunc;
- import com.yihu.quota.model.cube.Cube;
- import com.yihu.quota.service.dimension.DimensionService;
- import com.yihu.quota.vo.CubeMappingModel;
- import org.apache.commons.lang.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.text.NumberFormat;
- import java.text.ParseException;
- import java.util.*;
- /**
- * Created by janseny on 2018/9/18.
- */
- @Service
- public class ElasticSearchDataProcessService {
- private static Logger logger = LoggerFactory.getLogger(ElasticSearchDataProcessService.class);
- private static String dataSource_hbase = "hbase";
- private static String dataSource_mysql = "mysql";
- private static String action_del = "DeleteFamily";//删除整行
- private static String action_put = "Put"; //添加和修改单个字段值
- private static String action_delAll = "DelAll"; //删除整张表数据
- private static String action_putAll = "PutAll"; //添加整张表数据
- private static String dataSource_k = "dataSource";
- private static String database_k = "database";
- private static String table_k = "table";
- private static String id_k = "_id";
- private static String cubeId_k = "cubeId";
- private static String rowKey_k = "rowkey";
- private static String profileId_k = "profile_id";
- private static String action_k = "action";
- @Autowired
- private CubeMappingService cubeMappingService;
- @Autowired
- private ElasticSearchUtil elasticSearchUtil;
- @Autowired
- private CubeService cubeService;
- @Autowired
- private ObjectMapper objectMapper;
- @Autowired
- private RelevanceFunc relevanceFunc;
- /**
- *
- * @param data json 数据串
- */
- public void saveData(String data){
- try {
- Map<String, Object> dataMap = objectMapper.readValue(data,Map.class);
- if(dataMap.containsKey(dataSource_k)){
- String dataSource = dataMap.get(dataSource_k).toString();
- dataMap.remove(dataSource_k);
- if(dataSource.toLowerCase().equals(dataSource_hbase)){
- dataProcess(dataMap);
- }else if(dataSource.toLowerCase().equals(dataSource_mysql)){
- dataProcess(dataMap);
- }
- }
- } catch (Exception e) {
- logger.debug("json数据转换异常");
- e.getMessage();
- }
- }
- /**
- * @param dataMap
- * 如果是子集的数据 hbase 过来要指定父级数据
- * 如果是 删除 子表中的一行数据,es 这边就无法确定删除哪个数据
- */
- public void dataProcess(Map<String, Object> dataMap) throws Exception{
- String table = "";
- String rowKey = "";
- String profileId = "";
- String cubeId = "";
- String action = "";
- String database = "";
- if (dataMap.containsKey(database_k)) {
- database = dataMap.remove(database_k).toString();
- dataMap.remove(database_k);
- }
- if(dataMap.containsKey(table_k)){
- table = dataMap.get(table_k).toString();
- dataMap.remove(table_k);
- }
- if(dataMap.containsKey(rowKey_k)){
- rowKey = dataMap.get(rowKey_k).toString();
- dataMap.remove(rowKey_k);
- }
- if(dataMap.containsKey(profileId_k)){
- profileId = dataMap.get(profileId_k).toString();
- dataMap.remove(profileId_k);
- }
- if(dataMap.containsKey(action_k)){
- action = dataMap.get(action_k).toString();
- dataMap.remove(action_k);
- }
- if(dataMap.containsKey(cubeId_k)){
- cubeId = dataMap.get(cubeId_k).toString();
- }
- if(StringUtils.isEmpty(profileId)){
- profileId = rowKey;
- }
- String subRowKey = rowKey;
- try {
- String baseCloumnValue = null;
- if(action.equals(action_put)){
- for(String baseCloumnCode : dataMap.keySet()){
- if(dataMap.get(baseCloumnCode)!= null){
- baseCloumnValue = dataMap.get(baseCloumnCode).toString();
- List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode);
- if(cubeMappingModels != null && cubeMappingModels.size() > 0){
- for(CubeMappingModel cubeMappingModel :cubeMappingModels){
- // System.out.println("维度code = " + cubeMappingModel.getDimensionCode() + ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue );
- Map<String, Object> source = new HashMap<>();
- if(cubeMappingModel.getParentId() == null){
- source.put(id_k,rowKey);
- source.put(rowKey_k,rowKey);
- source.putAll(dimensionDataExtendToMap(cubeMappingModel,cubeMappingModel.getDimensionCode(),baseCloumnValue));
- }else { // 子集成员
- rowKey = profileId;
- source.put(id_k,profileId);
- source.put(rowKey_k,profileId);
- CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId());
- String parentCode = mappingModel.getDimensionCode();
- cubeMappingModel.setParentCode(parentCode);
- cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType());
- source.putAll(dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId));
- }
- String index = cubeMappingModel.getIndexName();
- String type = cubeMappingModel.getIndexType();
- saveElasticSearchData(index, type,rowKey,source);
- }
- }
- }
- }
- }else if(action.equals(action_del)){
- //一个表只能对应到一个 索引type
- Cube cube = cubeMappingService.findCubeByTableCode(table);
- if(cube != null){
- elasticSearchUtil.delete(cube.getIndexName(),cube.getIndexType(),rowKey);
- }else {
- throw new Exception("视图,表不存在");
- }
- }else if (action.equals(action_putAll)) {
- if (dataMap.containsKey("cubeId")) {
- //采用redis
- Cube cube = cubeService.findOne(Integer.parseInt(cubeId));
- if (null != cube) {
- String index = cube.getIndexName();
- String type = cube.getIndexType();
- List<Map<String,Object>> dataList = (List<Map<String,Object>>)dataMap.get("dataList");
- List<Map<String,Object>> sourcesList = new ArrayList<>();
- for(Map<String,Object> oneDataMap : dataList){
- Map<String, Object> source = new HashMap<>();
- for(String baseCloumnCode : oneDataMap.keySet()){
- source.put(id_k, oneDataMap.get(rowKey_k).toString());
- source.put(rowKey_k,oneDataMap.get(rowKey_k).toString());
- // System.out.println("列:" + baseCloumnCode );
- if(oneDataMap.get(baseCloumnCode)!= null){
- baseCloumnValue = oneDataMap.get(baseCloumnCode).toString();
- //这个 可以采用保存到redis 方式减少数据库压力
- List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode);
- if(cubeMappingModels != null && cubeMappingModels.size() > 0){
- for(CubeMappingModel cubeMappingModel :cubeMappingModels){
- // System.out.println("维度code = " + cubeMappingModel.getDimensionCode() + ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue );
- if(cubeMappingModel.getParentId() == null){
- source.putAll(dimensionDataExtendToMap(cubeMappingModel, cubeMappingModel.getDimensionCode(), baseCloumnValue));
- }else { // 子集成员
- //这个 可以采用保存到redis 方式减少数据库压力
- CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId());
- String parentCode = mappingModel.getDimensionCode();
- cubeMappingModel.setParentCode(parentCode);
- cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType());
- if(mappingModel != null){
- Map<String,Object> childMap = new HashMap<>();
- childMap = dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId);
- if(source.get(parentCode) != null ){
- Map<String, Object> parentMap = (Map<String, Object>) source.get(parentCode);
- parentMap.putAll((Map<String, Object>)childMap.get(parentCode));
- source.put(parentCode,parentMap);
- }else {
- source.put(parentCode,childMap.get(parentCode));
- }
- }
- }
- }
- }
- }
- }
- sourcesList.add(source);
- }
- elasticSearchUtil.bulkIndex(index,type,sourcesList);
- } else {
- throw new Exception("索引不存在");
- }
- }
- }else if (action.contains(action_delAll)) {
- if (dataMap.containsKey("cubeId")) {
- Cube cube = cubeService.findOne(Integer.parseInt(cubeId));
- if (null != cube) {
- elasticSearchUtil.deleteByField(cube.getIndexName(), cube.getIndexType(), "_index", cube.getIndexName());
- } else {
- throw new Exception("索引不存在");
- }
- }
- }
- }catch (ParseException e){
- logger.debug("elasticSearch 执行失败");
- e.printStackTrace();
- e.getMessage();
- } catch (Exception e) {
- logger.debug("数据解析异常");
- e.printStackTrace();
- }
- }
- /**
- * 维度数据扩展 转map
- * @param cubeMappingModel
- * @param cloumnCode
- * @return
- */
- public Map<String,Object> dimensionDataExtendToMap(CubeMappingModel cubeMappingModel,String cloumnCode,String baseCloumnValue) throws Exception {
- String dataType = cubeMappingModel.getDataType();
- int dataGetType = cubeMappingModel.getDataGetType();
- String relationFieldId = cubeMappingModel.getRelationFieldId();
- String relationDataFieldId = cubeMappingModel.getRelationDataFieldId();
- String dict = cubeMappingModel.getDict();
- String algorithm = cubeMappingModel.getAlgorithm();
- Map<String, Object> source = new HashMap<>();
- //字典扩展
- if(StringUtils.isNotEmpty(dict) && StringUtils.isEmpty(algorithm)){
- source = extendDictData(source,cloumnCode,dict,baseCloumnValue);
- }else if(StringUtils.isNotEmpty(algorithm)){
- // //通过 反射方式进行 后续开放 暂时字典redis不通
- // FuncHelper funcHelper = new FuncHelper();
- // algorithmParm = algorithmParm.replace("value",cloumnCode);
- // Object data = funcHelper.function(algorithm, algorithmParm.split(","));
- // if(StringUtils.isNotEmpty(dict)){
- // source = extendDictData(source,cloumnCode,dict,data.toString());
- // }else{
- // source.put(cloumnCode,data);
- // }
- }else if(StringUtils.isNotEmpty(relationFieldId) && dataGetType == 3 ) {// 关联获取
- String value = relevanceFunc.getRelationFieldVal(relationDataFieldId, relationFieldId, baseCloumnValue);
- source.put(cloumnCode,value);
- }else {
- source.put(cloumnCode,dataConver(dataType,baseCloumnValue));
- }
- return source;
- }
- /**
- * 维度成员数据扩展 转map
- * @param cubeMappingModel
- * @param baseCloumnValue
- * @param subRowKey
- * @return
- */
- public Map<String,Object> dimensionMemberDataExtendToMap(CubeMappingModel cubeMappingModel,String baseCloumnValue,String subRowKey,String profileId) throws Exception {
- try {
- int parentId = cubeMappingModel.getParentId();
- int childSaveType = 1;
- if(cubeMappingModel.getChildSaveType() != null ){
- childSaveType = cubeMappingModel.getChildSaveType();
- }
- String parentCode = cubeMappingModel.getParentCode();
- Map<String, Object> source = new HashMap<>();
- String cloumnCode = cubeMappingModel.getDimensionCode();
- String dataType = cubeMappingModel.getDataType();
- String dict = cubeMappingModel.getDict();
- String algorithm = cubeMappingModel.getAlgorithm();
- String algorithmParm = cubeMappingModel.getParm();
- String relationFieldId = cubeMappingModel.getRelationFieldId();
- int dataGetType = cubeMappingModel.getDataGetType();
- String index = cubeMappingModel.getIndexName();
- String type = cubeMappingModel.getIndexType();
- if(childSaveType == 1 ){//对象方式
- Map<String, Object> objChildMap = new HashMap<>();
- if(StringUtils.isNotEmpty(dict)){
- objChildMap = extendDictData(objChildMap,cloumnCode,dict,baseCloumnValue);
- source.put(parentCode,objChildMap);
- }else if(StringUtils.isNotEmpty(algorithm) && dataGetType == 2){//算法获取
- // //通过 反射方式进行 后续开放 暂时字典redis不通
- // FuncHelper funcHelper = new FuncHelper();
- // algorithmParm = algorithmParm.replace("value",cloumnCode);
- // Object data = funcHelper.function(algorithm, algorithmParm.split(","));
- // if(StringUtils.isNotEmpty(dict)){
- // objChildMap = extendDictData(objChildMap,cloumnCode,dict,data.toString());
- // }else{
- // objChildMap.put(cloumnCode,data);
- // }
- source.put(parentCode,objChildMap);
- }else if(StringUtils.isNotEmpty(relationFieldId) && dataGetType == 3 ) {// 关联获取
- String value = relevanceFunc.getRelationFieldVal(cubeMappingModel.getRelationDataFieldId(), relationFieldId, baseCloumnValue);
- objChildMap.put(cloumnCode,dataConver(cubeMappingModel.getDataType(),value));
- // 是否存在二级关联
- List<CubeMappingModel> secondMappingList = cubeMappingService.findSencodRelationDimension(cubeMappingModel.getCubeId(), cubeMappingModel.getId());
- if(secondMappingList != null && secondMappingList.size() > 0){
- for(CubeMappingModel cubeMappingSecond : secondMappingList){
- String secondRelationValue = relevanceFunc.getRelationFieldVal(cubeMappingSecond.getRelationDataFieldId(), cubeMappingSecond.getRelationFieldId(), value);
- objChildMap.put(cubeMappingSecond.getDimensionCode(),dataConver(cubeMappingSecond.getDataType(),secondRelationValue));
- }
- }
- }else {
- objChildMap.put(cloumnCode,dataConver(dataType,baseCloumnValue));
- }
- source.put(parentCode,objChildMap);
- }else if(childSaveType == 2 ){//nested 方式
- CubeMappingModel cubeMappingModelPri = cubeMappingService.findCubeMappingPrimary(parentId);
- String primaryKeyCode = "";
- if(cubeMappingModelPri != null ){
- primaryKeyCode = cubeMappingModelPri.getDimensionCode();
- }else {
- return null;
- }
- List<Map<String,Object>> nestedList = new ArrayList<>();
- //查出历史数据 然后组合保存
- Map<String, Object> oldMataMap = elasticSearchUtil.findById(index, type, profileId);
- if(oldMataMap != null && oldMataMap.size() > 0){
- //组装 子集历史数据,更改当前字段值 在添加
- List<Map<String, Object>> childList = (List<Map<String, Object>>)oldMataMap.get(parentCode);
- if(childList != null && childList.size() > 0){
- boolean isexist = false;
- for(Map<String, Object> map : childList){
- if(subRowKey.equals(map.get(primaryKeyCode).toString())){
- map.put(primaryKeyCode,subRowKey);
- map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
- if(StringUtils.isNotEmpty(dict)){
- map = extendDictData(map,cloumnCode,dict,baseCloumnValue);
- }else if(StringUtils.isNotEmpty(algorithm)){
- //其他算法 反射方式 todo
- }
- isexist = true;
- }
- nestedList.add(map);
- }
- if( !isexist){
- Map<String,Object> map = new HashMap<>();
- map.put(primaryKeyCode,subRowKey);
- map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
- if(StringUtils.isNotEmpty(dict)){
- map = extendDictData(map,cloumnCode,dict,baseCloumnValue);
- }else if(StringUtils.isNotEmpty(algorithm)){
- //其他算法
- }
- nestedList.add(map);
- }
- source.put(parentCode, nestedList);
- }else{
- Map<String,Object> map = new HashMap<>();
- map.put(primaryKeyCode,subRowKey);
- map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
- nestedList.add(map);
- source.put(parentCode, nestedList);
- }
- }else{
- throw new Exception("没有找到数据,无法更新");
- }
- }
- return source;
- }catch (Exception e){
- throw e;
- }
- }
- /**
- * 字典数据扩展
- * @param source 数据集合
- * @param cloumnCode 列code
- * @param dict 字典ID
- * @param code 字典编码
- * @return
- */
- public Map<String, Object> extendDictData(Map<String, Object> source ,String cloumnCode,String dict,String code){
- DictFunc dictFunc = new DictFunc();
- String value = "";
- String param[] = {dict,code};
- // value = dictFunc.execute(param);
- value = cloumnCode + "测试值";
- source.put(cloumnCode,code);
- source.put(cloumnCode + "_name",value);
- return source;
- }
- // /**
- // * 年龄扩展年龄段 及年龄段字典
- // * @param source 数据集合
- // * @param cloumnCode 列code
- // * @param dict 字典ID
- // * @param sourceValue 年龄
- // * @return
- // */
- // public Map<String, Object> extendAgeGroupData(Map<String, Object> source ,String cloumnCode,String dict,String sourceValue){
- // String value = "";
- // DictFunc dictFunc = new DictFunc();
- // AgeGroupFunc ageGroupFunc = new AgeGroupFunc();
- // String ageGroup = ageGroupFunc.execute(Integer.valueOf(sourceValue));
- // String param[] = {dict,ageGroup};
- //// value = dictFunc.execute(param);
- // value = "年龄段=" + ageGroup;
- // if(StringUtils.isNotEmpty(dict)){
- // source.put(cloumnCode,ageGroup);
- // source.put(cloumnCode + "Name",value);
- // }
- // return source;
- // }
- // /**
- // * 区域扩展
- // * @param source 数据集合
- // * @param cloumnCode 列code
- // * @param algorithmParm 算法参数
- // * @param sourceValue 年龄
- // * @return
- // */
- // public Map<String, Object> extendDivisionData(Map<String, Object> source ,String cloumnCode,String algorithmParm,String sourceValue){
- // DivisionFunc divisionFunc = new DivisionFunc();
- // String level = "1";
- // String divisionVal = "";
- // if(cloumnCode.toLowerCase().equals("town")){
- // level = "1";
- // divisionVal = "婺源县";
- // }else if(cloumnCode.toLowerCase().equals("city")){
- // level = "2";
- // divisionVal = "上饶市";
- // }else if(cloumnCode.toLowerCase().equals("province")){
- // level = "3";
- // divisionVal = "江西省";
- // }
- // String divisionParam[] = {sourceValue,level};
- //// divisionVal = divisionFunc.execute(divisionParam);
- // source.put(cloumnCode,divisionVal);
- // return source;
- // }
- /**
- * 添加修改 数据
- * @param index
- * @param type
- * @param rowKey
- * @param source
- * @throws ParseException
- */
- public void saveElasticSearchData(String index,String type,String rowKey,Map<String, Object> source) throws ParseException {
- Map<String, Object> data = elasticSearchUtil.findById(index, type, rowKey);
- if(data != null){
- elasticSearchUtil.update(index, type,rowKey,source);
- }else {
- source.put("extra_total","1");
- elasticSearchUtil.index(index, type,source);
- }
- }
- /**
- * 数据类型转换
- * @param dataType
- * @param keyValue
- */
- public Object dataConver(String dataType,String keyValue) throws Exception{
- Object value = null;
- NumberFormat nf = NumberFormat.getInstance();
- try {
- if(StringUtils.isNotEmpty(keyValue)){
- dataType = dataType.toLowerCase();
- if(dataType.equals("string")){
- value = keyValue;
- }else if(dataType.equals("integer")){
- int intValue = Integer.valueOf(keyValue);
- value = intValue;
- }else if(dataType.equals("double")){
- nf.setGroupingUsed(false);
- nf.setMaximumFractionDigits(2);
- double doubleValue = Double.valueOf(keyValue);
- value = doubleValue;
- }else if(dataType.equals("date")){
- Date dateValue = null;
- if(keyValue.length() > 10){
- if(keyValue.contains("-") ){
- //2016-08-04T00:00:00Z+0800
- keyValue = keyValue.substring(0,19);
- keyValue = keyValue.replace('T',' ');
- dateValue = DateUtil.parseDate(keyValue, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
- }else {
- //时间戳 1531130451000
- dateValue = DateUtil.toDateFromTime(keyValue);
- }
- }else {
- dateValue = DateUtil.parseDate(keyValue, DateUtil.DEFAULT_DATE_YMD_FORMAT);
- }
- //es 保存是少8小时
- Calendar ca = Calendar.getInstance();
- ca.setTime(dateValue);
- ca.add(Calendar.HOUR_OF_DAY, 8);
- value = ca.getTime();
- }
- }
- }catch (Exception e){
- e.printStackTrace();
- logger.debug("数据转换异常!");
- }
- return value;
- }
- public String converMapObject(Object object){
- Object[] obj = (Object[]) object;
- if(obj.length > 0){
- return obj[0].toString();
- }
- return "";
- }
- }
|