ElasticSearchDataProcessService.java 25 KB


  1. package com.yihu.quota.service.cube;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
  4. import com.yihu.ehr.util.datetime.DateUtil;
  5. import com.yihu.quota.etl.formula.DictFunc;
  6. import com.yihu.quota.etl.formula.RelevanceFunc;
  7. import com.yihu.quota.model.cube.Cube;
  8. import com.yihu.quota.service.dimension.DimensionService;
  9. import com.yihu.quota.vo.CubeMappingModel;
  10. import org.apache.commons.lang.StringUtils;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.stereotype.Service;
  15. import java.text.NumberFormat;
  16. import java.text.ParseException;
  17. import java.util.*;
  18. /**
  19. * Created by janseny on 2018/9/18.
  20. */
  21. @Service
  22. public class ElasticSearchDataProcessService {
  23. private static Logger logger = LoggerFactory.getLogger(ElasticSearchDataProcessService.class);
  24. private static String dataSource_hbase = "hbase";
  25. private static String dataSource_mysql = "mysql";
  26. private static String action_del = "DeleteFamily";//删除整行
  27. private static String action_put = "Put"; //添加和修改单个字段值
  28. private static String action_delAll = "DelAll"; //删除整张表数据
  29. private static String action_putAll = "PutAll"; //添加整张表数据
  30. private static String dataSource_k = "dataSource";
  31. private static String database_k = "database";
  32. private static String table_k = "table";
  33. private static String id_k = "_id";
  34. private static String cubeId_k = "cubeId";
  35. private static String rowKey_k = "rowkey";
  36. private static String profileId_k = "profile_id";
  37. private static String action_k = "action";
  38. @Autowired
  39. private CubeMappingService cubeMappingService;
  40. @Autowired
  41. private ElasticSearchUtil elasticSearchUtil;
  42. @Autowired
  43. private CubeService cubeService;
  44. @Autowired
  45. private ObjectMapper objectMapper;
  46. @Autowired
  47. private RelevanceFunc relevanceFunc;
  48. /**
  49. *
  50. * @param data json 数据串
  51. */
  52. public void saveData(String data){
  53. try {
  54. Map<String, Object> dataMap = objectMapper.readValue(data,Map.class);
  55. if(dataMap.containsKey(dataSource_k)){
  56. String dataSource = dataMap.get(dataSource_k).toString();
  57. dataMap.remove(dataSource_k);
  58. if(dataSource.toLowerCase().equals(dataSource_hbase)){
  59. dataProcess(dataMap);
  60. }else if(dataSource.toLowerCase().equals(dataSource_mysql)){
  61. dataProcess(dataMap);
  62. }
  63. }
  64. } catch (Exception e) {
  65. logger.debug("json数据转换异常");
  66. e.getMessage();
  67. }
  68. }
  69. /**
  70. * @param dataMap
  71. * 如果是子集的数据 hbase 过来要指定父级数据
  72. * 如果是 删除 子表中的一行数据,es 这边就无法确定删除哪个数据
  73. */
  74. public void dataProcess(Map<String, Object> dataMap) throws Exception{
  75. String table = "";
  76. String rowKey = "";
  77. String profileId = "";
  78. String cubeId = "";
  79. String action = "";
  80. String database = "";
  81. if (dataMap.containsKey(database_k)) {
  82. database = dataMap.remove(database_k).toString();
  83. dataMap.remove(database_k);
  84. }
  85. if(dataMap.containsKey(table_k)){
  86. table = dataMap.get(table_k).toString();
  87. dataMap.remove(table_k);
  88. }
  89. if(dataMap.containsKey(rowKey_k)){
  90. rowKey = dataMap.get(rowKey_k).toString();
  91. dataMap.remove(rowKey_k);
  92. }
  93. if(dataMap.containsKey(profileId_k)){
  94. profileId = dataMap.get(profileId_k).toString();
  95. dataMap.remove(profileId_k);
  96. }
  97. if(dataMap.containsKey(action_k)){
  98. action = dataMap.get(action_k).toString();
  99. dataMap.remove(action_k);
  100. }
  101. if(dataMap.containsKey(cubeId_k)){
  102. cubeId = dataMap.get(cubeId_k).toString();
  103. }
  104. if(StringUtils.isEmpty(profileId)){
  105. profileId = rowKey;
  106. }
  107. String subRowKey = rowKey;
  108. try {
  109. String baseCloumnValue = null;
  110. if(action.equals(action_put)){
  111. for(String baseCloumnCode : dataMap.keySet()){
  112. if(dataMap.get(baseCloumnCode)!= null){
  113. baseCloumnValue = dataMap.get(baseCloumnCode).toString();
  114. List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode);
  115. if(cubeMappingModels != null && cubeMappingModels.size() > 0){
  116. for(CubeMappingModel cubeMappingModel :cubeMappingModels){
  117. // System.out.println("维度code = " + cubeMappingModel.getDimensionCode() + ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue );
  118. Map<String, Object> source = new HashMap<>();
  119. if(cubeMappingModel.getParentId() == null){
  120. source.put(id_k,rowKey);
  121. source.put(rowKey_k,rowKey);
  122. source.putAll(dimensionDataExtendToMap(cubeMappingModel,cubeMappingModel.getDimensionCode(),baseCloumnValue));
  123. }else { // 子集成员
  124. rowKey = profileId;
  125. source.put(id_k,profileId);
  126. source.put(rowKey_k,profileId);
  127. CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId());
  128. String parentCode = mappingModel.getDimensionCode();
  129. cubeMappingModel.setParentCode(parentCode);
  130. cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType());
  131. source.putAll(dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId));
  132. }
  133. String index = cubeMappingModel.getIndexName();
  134. String type = cubeMappingModel.getIndexType();
  135. saveElasticSearchData(index, type,rowKey,source);
  136. }
  137. }
  138. }
  139. }
  140. }else if(action.equals(action_del)){
  141. //一个表只能对应到一个 索引type
  142. Cube cube = cubeMappingService.findCubeByTableCode(table);
  143. if(cube != null){
  144. elasticSearchUtil.delete(cube.getIndexName(),cube.getIndexType(),rowKey);
  145. }else {
  146. throw new Exception("视图,表不存在");
  147. }
  148. }else if (action.equals(action_putAll)) {
  149. if (dataMap.containsKey("cubeId")) {
  150. //采用redis
  151. Cube cube = cubeService.findOne(Integer.parseInt(cubeId));
  152. if (null != cube) {
  153. String index = cube.getIndexName();
  154. String type = cube.getIndexType();
  155. List<Map<String,Object>> dataList = (List<Map<String,Object>>)dataMap.get("dataList");
  156. List<Map<String,Object>> sourcesList = new ArrayList<>();
  157. for(Map<String,Object> oneDataMap : dataList){
  158. Map<String, Object> source = new HashMap<>();
  159. for(String baseCloumnCode : oneDataMap.keySet()){
  160. source.put(id_k, oneDataMap.get(rowKey_k).toString());
  161. source.put(rowKey_k,oneDataMap.get(rowKey_k).toString());
  162. // System.out.println("列:" + baseCloumnCode );
  163. if(oneDataMap.get(baseCloumnCode)!= null){
  164. baseCloumnValue = oneDataMap.get(baseCloumnCode).toString();
  165. //这个 可以采用保存到redis 方式减少数据库压力
  166. List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByField(table, baseCloumnCode);
  167. if(cubeMappingModels != null && cubeMappingModels.size() > 0){
  168. for(CubeMappingModel cubeMappingModel :cubeMappingModels){
  169. // System.out.println("维度code = " + cubeMappingModel.getDimensionCode() + ",维度类型:" + cubeMappingModel.getDataType() + ",值=" + baseCloumnValue );
  170. if(cubeMappingModel.getParentId() == null){
  171. source.putAll(dimensionDataExtendToMap(cubeMappingModel, cubeMappingModel.getDimensionCode(), baseCloumnValue));
  172. }else { // 子集成员
  173. //这个 可以采用保存到redis 方式减少数据库压力
  174. CubeMappingModel mappingModel = cubeMappingService.findParentDimension(cubeMappingModel.getParentId());
  175. String parentCode = mappingModel.getDimensionCode();
  176. cubeMappingModel.setParentCode(parentCode);
  177. cubeMappingModel.setChildSaveType(mappingModel.getChildSaveType());
  178. if(mappingModel != null){
  179. Map<String,Object> childMap = new HashMap<>();
  180. childMap = dimensionMemberDataExtendToMap(cubeMappingModel,baseCloumnValue,subRowKey,profileId);
  181. if(source.get(parentCode) != null ){
  182. Map<String, Object> parentMap = (Map<String, Object>) source.get(parentCode);
  183. parentMap.putAll((Map<String, Object>)childMap.get(parentCode));
  184. source.put(parentCode,parentMap);
  185. }else {
  186. source.put(parentCode,childMap.get(parentCode));
  187. }
  188. }
  189. }
  190. }
  191. }
  192. }
  193. }
  194. sourcesList.add(source);
  195. }
  196. elasticSearchUtil.bulkIndex(index,type,sourcesList);
  197. } else {
  198. throw new Exception("索引不存在");
  199. }
  200. }
  201. }else if (action.contains(action_delAll)) {
  202. if (dataMap.containsKey("cubeId")) {
  203. Cube cube = cubeService.findOne(Integer.parseInt(cubeId));
  204. if (null != cube) {
  205. elasticSearchUtil.deleteByField(cube.getIndexName(), cube.getIndexType(), "_index", cube.getIndexName());
  206. } else {
  207. throw new Exception("索引不存在");
  208. }
  209. }
  210. }
  211. }catch (ParseException e){
  212. logger.debug("elasticSearch 执行失败");
  213. e.printStackTrace();
  214. e.getMessage();
  215. } catch (Exception e) {
  216. logger.debug("数据解析异常");
  217. e.printStackTrace();
  218. }
  219. }
  220. /**
  221. * 维度数据扩展 转map
  222. * @param cubeMappingModel
  223. * @param cloumnCode
  224. * @return
  225. */
  226. public Map<String,Object> dimensionDataExtendToMap(CubeMappingModel cubeMappingModel,String cloumnCode,String baseCloumnValue) throws Exception {
  227. String dataType = cubeMappingModel.getDataType();
  228. int dataGetType = cubeMappingModel.getDataGetType();
  229. String relationFieldId = cubeMappingModel.getRelationFieldId();
  230. String relationDataFieldId = cubeMappingModel.getRelationDataFieldId();
  231. String dict = cubeMappingModel.getDict();
  232. String algorithm = cubeMappingModel.getAlgorithm();
  233. Map<String, Object> source = new HashMap<>();
  234. //字典扩展
  235. if(StringUtils.isNotEmpty(dict) && StringUtils.isEmpty(algorithm)){
  236. source = extendDictData(source,cloumnCode,dict,baseCloumnValue);
  237. }else if(StringUtils.isNotEmpty(algorithm)){
  238. // //通过 反射方式进行 后续开放 暂时字典redis不通
  239. // FuncHelper funcHelper = new FuncHelper();
  240. // algorithmParm = algorithmParm.replace("value",cloumnCode);
  241. // Object data = funcHelper.function(algorithm, algorithmParm.split(","));
  242. // if(StringUtils.isNotEmpty(dict)){
  243. // source = extendDictData(source,cloumnCode,dict,data.toString());
  244. // }else{
  245. // source.put(cloumnCode,data);
  246. // }
  247. }else if(StringUtils.isNotEmpty(relationFieldId) && dataGetType == 3 ) {// 关联获取
  248. String value = relevanceFunc.getRelationFieldVal(relationDataFieldId, relationFieldId, baseCloumnValue);
  249. source.put(cloumnCode,value);
  250. }else {
  251. source.put(cloumnCode,dataConver(dataType,baseCloumnValue));
  252. }
  253. return source;
  254. }
  255. /**
  256. * 维度成员数据扩展 转map
  257. * @param cubeMappingModel
  258. * @param baseCloumnValue
  259. * @param subRowKey
  260. * @return
  261. */
  262. public Map<String,Object> dimensionMemberDataExtendToMap(CubeMappingModel cubeMappingModel,String baseCloumnValue,String subRowKey,String profileId) throws Exception {
  263. try {
  264. int parentId = cubeMappingModel.getParentId();
  265. int childSaveType = 1;
  266. if(cubeMappingModel.getChildSaveType() != null ){
  267. childSaveType = cubeMappingModel.getChildSaveType();
  268. }
  269. String parentCode = cubeMappingModel.getParentCode();
  270. Map<String, Object> source = new HashMap<>();
  271. String cloumnCode = cubeMappingModel.getDimensionCode();
  272. String dataType = cubeMappingModel.getDataType();
  273. String dict = cubeMappingModel.getDict();
  274. String algorithm = cubeMappingModel.getAlgorithm();
  275. String algorithmParm = cubeMappingModel.getParm();
  276. String relationFieldId = cubeMappingModel.getRelationFieldId();
  277. int dataGetType = cubeMappingModel.getDataGetType();
  278. String index = cubeMappingModel.getIndexName();
  279. String type = cubeMappingModel.getIndexType();
  280. if(childSaveType == 1 ){//对象方式
  281. Map<String, Object> objChildMap = new HashMap<>();
  282. if(StringUtils.isNotEmpty(dict)){
  283. objChildMap = extendDictData(objChildMap,cloumnCode,dict,baseCloumnValue);
  284. source.put(parentCode,objChildMap);
  285. }else if(StringUtils.isNotEmpty(algorithm) && dataGetType == 2){//算法获取
  286. // //通过 反射方式进行 后续开放 暂时字典redis不通
  287. // FuncHelper funcHelper = new FuncHelper();
  288. // algorithmParm = algorithmParm.replace("value",cloumnCode);
  289. // Object data = funcHelper.function(algorithm, algorithmParm.split(","));
  290. // if(StringUtils.isNotEmpty(dict)){
  291. // objChildMap = extendDictData(objChildMap,cloumnCode,dict,data.toString());
  292. // }else{
  293. // objChildMap.put(cloumnCode,data);
  294. // }
  295. source.put(parentCode,objChildMap);
  296. }else if(StringUtils.isNotEmpty(relationFieldId) && dataGetType == 3 ) {// 关联获取
  297. String value = relevanceFunc.getRelationFieldVal(cubeMappingModel.getRelationDataFieldId(), relationFieldId, baseCloumnValue);
  298. objChildMap.put(cloumnCode,dataConver(cubeMappingModel.getDataType(),value));
  299. // 是否存在二级关联
  300. List<CubeMappingModel> secondMappingList = cubeMappingService.findSencodRelationDimension(cubeMappingModel.getCubeId(), cubeMappingModel.getId());
  301. if(secondMappingList != null && secondMappingList.size() > 0){
  302. for(CubeMappingModel cubeMappingSecond : secondMappingList){
  303. String secondRelationValue = relevanceFunc.getRelationFieldVal(cubeMappingSecond.getRelationDataFieldId(), cubeMappingSecond.getRelationFieldId(), value);
  304. objChildMap.put(cubeMappingSecond.getDimensionCode(),dataConver(cubeMappingSecond.getDataType(),secondRelationValue));
  305. }
  306. }
  307. }else {
  308. objChildMap.put(cloumnCode,dataConver(dataType,baseCloumnValue));
  309. }
  310. source.put(parentCode,objChildMap);
  311. }else if(childSaveType == 2 ){//nested 方式
  312. CubeMappingModel cubeMappingModelPri = cubeMappingService.findCubeMappingPrimary(parentId);
  313. String primaryKeyCode = "";
  314. if(cubeMappingModelPri != null ){
  315. primaryKeyCode = cubeMappingModelPri.getDimensionCode();
  316. }else {
  317. return null;
  318. }
  319. List<Map<String,Object>> nestedList = new ArrayList<>();
  320. //查出历史数据 然后组合保存
  321. Map<String, Object> oldMataMap = elasticSearchUtil.findById(index, type, profileId);
  322. if(oldMataMap != null && oldMataMap.size() > 0){
  323. //组装 子集历史数据,更改当前字段值 在添加
  324. List<Map<String, Object>> childList = (List<Map<String, Object>>)oldMataMap.get(parentCode);
  325. if(childList != null && childList.size() > 0){
  326. boolean isexist = false;
  327. for(Map<String, Object> map : childList){
  328. if(subRowKey.equals(map.get(primaryKeyCode).toString())){
  329. map.put(primaryKeyCode,subRowKey);
  330. map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
  331. if(StringUtils.isNotEmpty(dict)){
  332. map = extendDictData(map,cloumnCode,dict,baseCloumnValue);
  333. }else if(StringUtils.isNotEmpty(algorithm)){
  334. //其他算法 反射方式 todo
  335. }
  336. isexist = true;
  337. }
  338. nestedList.add(map);
  339. }
  340. if( !isexist){
  341. Map<String,Object> map = new HashMap<>();
  342. map.put(primaryKeyCode,subRowKey);
  343. map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
  344. if(StringUtils.isNotEmpty(dict)){
  345. map = extendDictData(map,cloumnCode,dict,baseCloumnValue);
  346. }else if(StringUtils.isNotEmpty(algorithm)){
  347. //其他算法
  348. }
  349. nestedList.add(map);
  350. }
  351. source.put(parentCode, nestedList);
  352. }else{
  353. Map<String,Object> map = new HashMap<>();
  354. map.put(primaryKeyCode,subRowKey);
  355. map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
  356. nestedList.add(map);
  357. source.put(parentCode, nestedList);
  358. }
  359. }else{
  360. throw new Exception("没有找到数据,无法更新");
  361. }
  362. }
  363. return source;
  364. }catch (Exception e){
  365. throw e;
  366. }
  367. }
  368. /**
  369. * 字典数据扩展
  370. * @param source 数据集合
  371. * @param cloumnCode 列code
  372. * @param dict 字典ID
  373. * @param code 字典编码
  374. * @return
  375. */
  376. public Map<String, Object> extendDictData(Map<String, Object> source ,String cloumnCode,String dict,String code){
  377. DictFunc dictFunc = new DictFunc();
  378. String value = "";
  379. String param[] = {dict,code};
  380. // value = dictFunc.execute(param);
  381. value = cloumnCode + "测试值";
  382. source.put(cloumnCode,code);
  383. source.put(cloumnCode + "_name",value);
  384. return source;
  385. }
  386. // /**
  387. // * 年龄扩展年龄段 及年龄段字典
  388. // * @param source 数据集合
  389. // * @param cloumnCode 列code
  390. // * @param dict 字典ID
  391. // * @param sourceValue 年龄
  392. // * @return
  393. // */
  394. // public Map<String, Object> extendAgeGroupData(Map<String, Object> source ,String cloumnCode,String dict,String sourceValue){
  395. // String value = "";
  396. // DictFunc dictFunc = new DictFunc();
  397. // AgeGroupFunc ageGroupFunc = new AgeGroupFunc();
  398. // String ageGroup = ageGroupFunc.execute(Integer.valueOf(sourceValue));
  399. // String param[] = {dict,ageGroup};
  400. //// value = dictFunc.execute(param);
  401. // value = "年龄段=" + ageGroup;
  402. // if(StringUtils.isNotEmpty(dict)){
  403. // source.put(cloumnCode,ageGroup);
  404. // source.put(cloumnCode + "Name",value);
  405. // }
  406. // return source;
  407. // }
  408. // /**
  409. // * 区域扩展
  410. // * @param source 数据集合
  411. // * @param cloumnCode 列code
  412. // * @param algorithmParm 算法参数
  413. // * @param sourceValue 年龄
  414. // * @return
  415. // */
  416. // public Map<String, Object> extendDivisionData(Map<String, Object> source ,String cloumnCode,String algorithmParm,String sourceValue){
  417. // DivisionFunc divisionFunc = new DivisionFunc();
  418. // String level = "1";
  419. // String divisionVal = "";
  420. // if(cloumnCode.toLowerCase().equals("town")){
  421. // level = "1";
  422. // divisionVal = "婺源县";
  423. // }else if(cloumnCode.toLowerCase().equals("city")){
  424. // level = "2";
  425. // divisionVal = "上饶市";
  426. // }else if(cloumnCode.toLowerCase().equals("province")){
  427. // level = "3";
  428. // divisionVal = "江西省";
  429. // }
  430. // String divisionParam[] = {sourceValue,level};
  431. //// divisionVal = divisionFunc.execute(divisionParam);
  432. // source.put(cloumnCode,divisionVal);
  433. // return source;
  434. // }
  435. /**
  436. * 添加修改 数据
  437. * @param index
  438. * @param type
  439. * @param rowKey
  440. * @param source
  441. * @throws ParseException
  442. */
  443. public void saveElasticSearchData(String index,String type,String rowKey,Map<String, Object> source) throws ParseException {
  444. Map<String, Object> data = elasticSearchUtil.findById(index, type, rowKey);
  445. if(data != null){
  446. elasticSearchUtil.update(index, type,rowKey,source);
  447. }else {
  448. source.put("extra_total","1");
  449. elasticSearchUtil.index(index, type,source);
  450. }
  451. }
  452. /**
  453. * 数据类型转换
  454. * @param dataType
  455. * @param keyValue
  456. */
  457. public Object dataConver(String dataType,String keyValue) throws Exception{
  458. Object value = null;
  459. NumberFormat nf = NumberFormat.getInstance();
  460. try {
  461. if(StringUtils.isNotEmpty(keyValue)){
  462. dataType = dataType.toLowerCase();
  463. if(dataType.equals("string")){
  464. value = keyValue;
  465. }else if(dataType.equals("integer")){
  466. int intValue = Integer.valueOf(keyValue);
  467. value = intValue;
  468. }else if(dataType.equals("double")){
  469. nf.setGroupingUsed(false);
  470. nf.setMaximumFractionDigits(2);
  471. double doubleValue = Double.valueOf(keyValue);
  472. value = doubleValue;
  473. }else if(dataType.equals("date")){
  474. Date dateValue = null;
  475. if(keyValue.length() > 10){
  476. if(keyValue.contains("-") ){
  477. //2016-08-04T00:00:00Z+0800
  478. keyValue = keyValue.substring(0,19);
  479. keyValue = keyValue.replace('T',' ');
  480. dateValue = DateUtil.parseDate(keyValue, DateUtil.DEFAULT_YMDHMSDATE_FORMAT);
  481. }else {
  482. //时间戳 1531130451000
  483. dateValue = DateUtil.toDateFromTime(keyValue);
  484. }
  485. }else {
  486. dateValue = DateUtil.parseDate(keyValue, DateUtil.DEFAULT_DATE_YMD_FORMAT);
  487. }
  488. //es 保存是少8小时
  489. Calendar ca = Calendar.getInstance();
  490. ca.setTime(dateValue);
  491. ca.add(Calendar.HOUR_OF_DAY, 8);
  492. value = ca.getTime();
  493. }
  494. }
  495. }catch (Exception e){
  496. e.printStackTrace();
  497. logger.debug("数据转换异常!");
  498. }
  499. return value;
  500. }
  501. public String converMapObject(Object object){
  502. Object[] obj = (Object[]) object;
  503. if(obj.length > 0){
  504. return obj[0].toString();
  505. }
  506. return "";
  507. }
  508. }