ElasticSearchDataProcessService.java 13 KB


  1. package com.yihu.quota.service.cube;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.google.gson.Gson;
  4. import com.yihu.ehr.elasticsearch.ElasticSearchPool;
  5. import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
  6. import com.yihu.ehr.util.datetime.DateUtil;
  7. import com.yihu.quota.etl.formula.AgeGroupFunc;
  8. import com.yihu.quota.etl.formula.DictFunc;
  9. import com.yihu.quota.etl.formula.DivisionFunc;
  10. import com.yihu.quota.util.ElasticSearchHandler;
  11. import com.yihu.quota.vo.CubeMappingModel;
  12. import com.yihu.quota.vo.CubeMemberMappingModel;
  13. import org.apache.commons.lang.StringUtils;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.stereotype.Service;
  18. import java.text.NumberFormat;
  19. import java.text.ParseException;
  20. import java.util.Date;
  21. import java.util.HashMap;
  22. import java.util.List;
  23. import java.util.Map;
  24. /**
  25. * Created by janseny on 2018/9/18.
  26. */
  27. @Service
  28. public class ElasticSearchDataProcessService {
  29. private static Logger logger = LoggerFactory.getLogger(ElasticSearchDataProcessService.class);
  30. private static String dataSource_hbase = "hbase";
  31. private static String dataSource_mysql = "mysql";
  32. private static String action_put = "Put";//添加和修改单个字段值
  33. private static String action_del = "DeleteColumn";//删除单个字段值
  34. private static String action_delFamily = "DeleteFamily";//删除整行
  35. private static String dataSource_k = "dataSource";
  36. private static String table_k = "table";
  37. private static String rowKey_k = "rowkey";
  38. private static String action_k = "action";
  39. @Autowired
  40. private ObjectMapper objectMapper;
  41. @Autowired
  42. private CubeMappingService cubeMappingService;
  43. @Autowired
  44. private CubeMemberMappingService cubeMemberMappingService;
  45. @Autowired
  46. private ElasticSearchPool elasticSearchPool;
  47. @Autowired
  48. private ElasticSearchUtil elasticSearchUtil;
  49. /**
  50. *
  51. * @param data json 数据串
  52. */
  53. public void saveData(String data){
  54. try {
  55. Gson gson = new Gson();
  56. Map<String, Object> dataMap = gson.fromJson(data, Map.class);
  57. if(dataMap.containsKey(dataSource_k)){
  58. String dataSource = dataMap.get(dataSource_k).toString();
  59. dataMap.remove(dataSource_k);
  60. if(dataSource.toLowerCase().equals(dataSource_hbase)){
  61. hbaseDataProcess(dataMap);
  62. }else if(dataSource.toLowerCase().equals(dataSource_mysql)){
  63. mysqlDataProcess(dataMap);
  64. }
  65. }
  66. } catch (Exception e) {
  67. logger.debug("json数据转换异常");
  68. e.getMessage();
  69. }
  70. }
  71. /**
  72. * @param dataMap
  73. */
  74. public void hbaseDataProcess(Map<String, Object> dataMap){
  75. Map<String, Object> source = new HashMap<>();
  76. String index = "";
  77. String type = "";
  78. String table = dataMap.get(table_k).toString();
  79. //通过表找到 对应的数据集 保存的索引和type
  80. //TODO 可以维护到数据字典 - 保存到redis 减少去数据库里面查询
  81. String rowKey = dataMap.get(rowKey_k).toString();
  82. String action = dataMap.get(action_k).toString();
  83. dataMap.remove(table_k);
  84. dataMap.remove(rowKey_k);
  85. dataMap.remove(action_k);
  86. try {
  87. if(action.contains(action_put)){
  88. String keyValue = "";
  89. for(String key : dataMap.keySet()){
  90. if(dataMap.get(key)!= null){
  91. keyValue = dataMap.get(key).toString();
  92. }
  93. //根据列名 查找出 对应的维度code及是否要数据字典,是否通过算法扩展出来
  94. // 是否是子集模式中
  95. List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, key);
  96. if(cubeMappingModels != null && cubeMappingModels.size() > 0){
  97. for(CubeMappingModel cubeMappingModel :cubeMappingModels){
  98. String cloumnCode = cubeMappingModel.getDimensionCode();
  99. DictFunc dictFunc = new DictFunc();
  100. //字典扩展
  101. if(StringUtils.isNotEmpty(cubeMappingModel.getDict()) && StringUtils.isEmpty(cubeMappingModel.getAlgorithm())){
  102. String param[] = {cubeMappingModel.getDict(),keyValue};
  103. String value = dictFunc.execute(param);
  104. String dictCode = cloumnCode + ".Code";
  105. String dictName = cloumnCode + ".Name";
  106. source.put(dictCode,key);
  107. source.put(dictName,value);
  108. }else if(StringUtils.isNotEmpty(cubeMappingModel.getAlgorithm())){
  109. //计算后 又经过字典 如:年龄段
  110. if(cubeMappingModel.getAlgorithm().equals("AgeGroupFunc") && StringUtils.isNotEmpty(cubeMappingModel.getParm())){
  111. AgeGroupFunc ageGroupFunc = new AgeGroupFunc();
  112. String ageGroup = ageGroupFunc.execute(keyValue);
  113. String param[] = {cubeMappingModel.getDict(),ageGroup};
  114. String value = dictFunc.execute(param);
  115. if(StringUtils.isNotEmpty(cubeMappingModel.getDict())){
  116. String dictCode = cloumnCode + ".Code";
  117. String dictName = cloumnCode + ".Name";
  118. source.put(dictCode,key);
  119. source.put(dictName,value);
  120. }
  121. }
  122. //其他具体维度 对应具体算法
  123. }else {
  124. if(StringUtils.isNotEmpty(cubeMappingModel.getDataType())){
  125. String dataType = cubeMappingModel.getDataType();
  126. Object value = dataConver(dataType,keyValue);
  127. source.put(cloumnCode,value);
  128. }else{
  129. source.put(cloumnCode,dataMap.get(key));
  130. }
  131. }
  132. }
  133. }
  134. List<CubeMemberMappingModel> cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,key);
  135. if(cubeMemberMappingModels != null && cubeMemberMappingModels.size() > 0){
  136. for(CubeMemberMappingModel cubeMemberMappingModel :cubeMemberMappingModels){
  137. String cloumnCode = cubeMemberMappingModel.getDimensionCode();
  138. String parentCode = cubeMemberMappingModel.getParentCode();
  139. if(cubeMemberMappingModel.getChildSaveType() != null){
  140. int childSaveType = cubeMemberMappingModel.getChildSaveType();
  141. if(childSaveType == 1 ){//对象方式
  142. //拼装成数据结构
  143. String field = parentCode + "." + cloumnCode;
  144. source.put(field,keyValue);
  145. }
  146. if(childSaveType == 2 ){//nested 方式
  147. //查出历史数据 然后组合保存
  148. String field = parentCode + ".subRowkey";
  149. List<Map<String, Object>> subDataList = elasticSearchUtil.findByField(index, type, field, rowKey);
  150. if(subDataList != null && subDataList.size() > 0){
  151. String parentRowkey = subDataList.get(0).get(rowKey_k).toString();
  152. List<Map<String, Object>> dataList = elasticSearchUtil.findByField(index, type, rowKey_k, parentRowkey);
  153. if(dataList != null && dataList.size() > 0){
  154. //组装 子集历史数据 在添加
  155. }else {
  156. //单条添加
  157. }
  158. }
  159. }
  160. }else {
  161. //字典扩展
  162. if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDict())){
  163. DictFunc dictFunc = new DictFunc();
  164. String param[] = {cubeMemberMappingModel.getDict(),cloumnCode};
  165. String value = dictFunc.execute(param);
  166. String dictCode = cloumnCode + ".Code";
  167. String dictName = cloumnCode + ".Name";
  168. source.put(dictCode,key);
  169. source.put(dictName,value);
  170. }else if(StringUtils.isNotEmpty(cubeMemberMappingModel.getAlgorithm())){
  171. if(cubeMemberMappingModel.getAlgorithm().equals("DivisionFunc") && StringUtils.isNotEmpty(cubeMemberMappingModel.getParm())){
  172. if(cubeMemberMappingModel.getParm().equals(key)){
  173. DivisionFunc divisionFunc = new DivisionFunc();
  174. String townParam[] = {dataMap.get(key).toString(),"1"};
  175. String townVal = divisionFunc.execute(townParam);
  176. String cityParam[] = {dataMap.get(key).toString(),"2"};
  177. String cityVal = divisionFunc.execute(cityParam);
  178. String provinceParam[] = {dataMap.get(key).toString(),"3"};
  179. String provinceVal = divisionFunc.execute(provinceParam);
  180. source.put("",townVal);
  181. source.put("",cityVal);
  182. source.put("",provinceVal);
  183. }
  184. }
  185. }else {
  186. if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDataType())){
  187. String dataType = cubeMemberMappingModel.getDataType();
  188. Object value = dataConver(dataType,keyValue);
  189. source.put(cloumnCode,value);
  190. }else{
  191. source.put(cloumnCode,dataMap.get(key));
  192. }
  193. }
  194. }
  195. }
  196. }
  197. }
  198. elasticSearchUtil.index(index, type, dataMap);
  199. }else if (action.contains(action_del)){
  200. for(String key : dataMap.keySet()){
  201. dataMap.put(key,"");
  202. }
  203. //保存数据
  204. elasticSearchUtil.index(index, type, dataMap);
  205. }else if(action.contains(action_delFamily)){
  206. elasticSearchUtil.delete(index,type,rowKey);
  207. }
  208. }catch (ParseException e){
  209. logger.debug("elasticSearch 执行失败");
  210. e.printStackTrace();
  211. e.getMessage();
  212. } catch (Exception e) {
  213. logger.debug("数据解析异常");
  214. e.printStackTrace();
  215. }
  216. }
  217. /**
  218. * @param dataMap
  219. */
  220. public Map<String, Object> mysqlDataProcess(Map<String, Object> dataMap){
  221. Map<String, Object> source = new HashMap<>();
  222. String table = dataMap.get(table_k).toString();
  223. String rowKey = dataMap.get(rowKey_k).toString();
  224. //处理 数据库执行动作 增删改
  225. //TODO
  226. return source;
  227. }
  228. /**
  229. * 数据类型转换
  230. * @param dataType
  231. * @param keyValue
  232. */
  233. public Object dataConver(String dataType,String keyValue){
  234. NumberFormat nf = NumberFormat.getInstance();
  235. Object value = null;
  236. dataType = dataType.toLowerCase();
  237. if(dataType.equals("string")){
  238. value = keyValue;
  239. }else if(dataType.equals("int")){
  240. int intValue = Integer.valueOf(keyValue);
  241. value = intValue;
  242. }else if(dataType.equals("double")){
  243. nf.setGroupingUsed(false);
  244. nf.setMaximumFractionDigits(2);
  245. double doubleValue = Double.valueOf(keyValue);
  246. value = doubleValue;
  247. }else if(dataType.equals("date")){
  248. Date dateValue = DateUtil.formatCharDateYMDHMS(keyValue);
  249. value = dateValue;
  250. }
  251. return value;
  252. }
  253. }