|
@ -0,0 +1,414 @@
|
|
|
package com.yihu.quota.service.cube;
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.google.gson.Gson;
|
|
|
import com.yihu.ehr.elasticsearch.ElasticSearchPool;
|
|
|
import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
|
|
|
import com.yihu.ehr.util.datetime.DateUtil;
|
|
|
import com.yihu.quota.etl.formula.AgeGroupFunc;
|
|
|
import com.yihu.quota.etl.formula.DictFunc;
|
|
|
import com.yihu.quota.etl.formula.DivisionFunc;
|
|
|
import com.yihu.quota.model.cube.Cube;
|
|
|
import com.yihu.quota.model.cube.CubeMemberMapping;
|
|
|
import com.yihu.quota.util.ElasticSearchHandler;
|
|
|
import com.yihu.quota.vo.CubeMappingModel;
|
|
|
import com.yihu.quota.vo.CubeMemberMappingModel;
|
|
|
import net.bytebuddy.implementation.bytecode.Throw;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.text.NumberFormat;
|
|
|
import java.text.ParseException;
|
|
|
import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* Created by janseny on 2018/9/18.
|
|
|
*/
|
|
|
@Service
|
|
|
public class ElasticSearchDataProcessService {
|
|
|
private static Logger logger = LoggerFactory.getLogger(ElasticSearchDataProcessService.class);
|
|
|
private static String dataSource_hbase = "hbase";
|
|
|
private static String dataSource_mysql = "mysql";
|
|
|
private static String action_del = "DeleteFamily";//删除整行
|
|
|
private static String action_put = "Put"; //添加和修改单个字段值
|
|
|
|
|
|
private static String dataSource_k = "dataSource";
|
|
|
private static String table_k = "table";
|
|
|
private static String id_k = "_id";
|
|
|
private static String rowKey_k = "rowkey";
|
|
|
private static String profileId_k = "profile_id";
|
|
|
private static String action_k = "action";
|
|
|
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
@Autowired
|
|
|
private CubeMappingService cubeMappingService;
|
|
|
@Autowired
|
|
|
private CubeMemberMappingService cubeMemberMappingService;
|
|
|
@Autowired
|
|
|
private ElasticSearchUtil elasticSearchUtil;
|
|
|
|
|
|
/**
|
|
|
*
|
|
|
* @param data json 数据串
|
|
|
*/
|
|
|
public void saveData(String data){
|
|
|
try {
|
|
|
Gson gson = new Gson();
|
|
|
Map<String, Object> dataMap = gson.fromJson(data, Map.class);
|
|
|
if(dataMap.containsKey(dataSource_k)){
|
|
|
String dataSource = dataMap.get(dataSource_k).toString();
|
|
|
dataMap.remove(dataSource_k);
|
|
|
if(dataSource.toLowerCase().equals(dataSource_hbase)){
|
|
|
hbaseDataProcess(dataMap);
|
|
|
}else if(dataSource.toLowerCase().equals(dataSource_mysql)){
|
|
|
mysqlDataProcess(dataMap);
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
logger.debug("json数据转换异常");
|
|
|
e.getMessage();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @param dataMap
|
|
|
* 如果是子集的数据 hbase 过来要指定父级数据
|
|
|
* 如果是 删除 子表中的一行数据,es 这边就无法确定删除哪个数据
|
|
|
*/
|
|
|
public void hbaseDataProcess(Map<String, Object> dataMap) throws Exception{
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String table = "";
|
|
|
String rowKey = "";
|
|
|
String profileId = "";
|
|
|
String action = "";
|
|
|
if(dataMap.containsKey(table_k)){
|
|
|
table = dataMap.get(table_k).toString();
|
|
|
}
|
|
|
if(dataMap.containsKey(rowKey_k)){
|
|
|
rowKey = dataMap.get(rowKey_k).toString();
|
|
|
source.put(id_k,rowKey);
|
|
|
source.put(rowKey_k,rowKey);
|
|
|
}
|
|
|
if(dataMap.containsKey(profileId_k)){
|
|
|
profileId = dataMap.get(profileId_k).toString();
|
|
|
}
|
|
|
if(dataMap.containsKey(action_k)){
|
|
|
action = dataMap.get(action_k).toString();
|
|
|
}
|
|
|
try {
|
|
|
dataMap.remove(table_k);
|
|
|
dataMap.remove(rowKey_k);
|
|
|
dataMap.remove(action_k);
|
|
|
String keyValue = "";
|
|
|
if(action.contains(action_put)){
|
|
|
for(String hbaseColCode : dataMap.keySet()){
|
|
|
if(dataMap.get(hbaseColCode)!= null){
|
|
|
keyValue = dataMap.get(hbaseColCode).toString();
|
|
|
}
|
|
|
List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, hbaseColCode);
|
|
|
if(cubeMappingModels != null && cubeMappingModels.size() > 0){
|
|
|
for(CubeMappingModel cubeMappingModel :cubeMappingModels){
|
|
|
String index = cubeMappingModel.getIndexName();
|
|
|
String type = cubeMappingModel.getIndexType();
|
|
|
String cloumnCode = cubeMappingModel.getDimensionCode();
|
|
|
String dataType = cubeMappingModel.getDataType();
|
|
|
String dict = cubeMappingModel.getDict();
|
|
|
String algorithm = cubeMappingModel.getAlgorithm();
|
|
|
String algorithmParm = cubeMappingModel.getParm();
|
|
|
//维度数据扩展保存
|
|
|
Map<String, Object> esDataMap = dimensionDataExtendToMap(keyValue, cloumnCode, dataType, dict, algorithm, algorithmParm);
|
|
|
source.putAll(esDataMap);
|
|
|
saveElasticSearchData(index, type,rowKey,source);
|
|
|
}
|
|
|
}
|
|
|
//如果存在维度的话 rowkey 赋值给 profileId
|
|
|
if(cubeMappingModels != null){
|
|
|
profileId = rowKey;
|
|
|
}
|
|
|
// 是否是子集属性
|
|
|
List<CubeMemberMappingModel> cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,hbaseColCode);
|
|
|
if(cubeMemberMappingModels != null && cubeMemberMappingModels.size() > 0){
|
|
|
for(CubeMemberMappingModel cubeMemberMappingModel :cubeMemberMappingModels){
|
|
|
String index = cubeMemberMappingModel.getIndexName();
|
|
|
String type = cubeMemberMappingModel.getIndexType();
|
|
|
String subRowKey = rowKey;
|
|
|
if(action.contains(action_put)){
|
|
|
//维度成员数据扩展保存
|
|
|
source = dimensionMemberDataExtendToMap(cubeMemberMappingModel,keyValue,subRowKey,profileId);
|
|
|
saveElasticSearchData(index, type,profileId,source);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}else if(action.contains(action_del)){
|
|
|
//一个表只能对应到一个 索引type
|
|
|
Cube cube = cubeMemberMappingService.findCubeByTableCode(table);
|
|
|
if(cube != null){
|
|
|
elasticSearchUtil.delete(cube.getIndexType(),cube.getIndexType(),rowKey);
|
|
|
}else {
|
|
|
throw new Exception("视图,表不存在");
|
|
|
}
|
|
|
}
|
|
|
}catch (ParseException e){
|
|
|
logger.debug("elasticSearch 执行失败");
|
|
|
e.printStackTrace();
|
|
|
e.getMessage();
|
|
|
} catch (Exception e) {
|
|
|
logger.debug("数据解析异常");
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 维度数据扩展 转map
|
|
|
* @param cloumnValue
|
|
|
* @param cloumnCode
|
|
|
* @param dataType
|
|
|
* @param dict
|
|
|
* @param algorithm
|
|
|
* @param algorithmParm
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String,Object> dimensionDataExtendToMap(String cloumnValue,String cloumnCode,String dataType ,String dict,String algorithm,String algorithmParm ){
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(dict) && StringUtils.isEmpty(algorithm)){
|
|
|
source = extendDictData(source,cloumnCode,dict,cloumnValue);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//年龄段
|
|
|
if(algorithm.equals("AgeGroupFunc")){
|
|
|
source = extendAgeGroupData(source,cloumnCode,dict,cloumnValue);
|
|
|
}
|
|
|
//其他算法 --
|
|
|
}else {
|
|
|
source.put(cloumnCode,dataConver(dataType,cloumnValue));
|
|
|
}
|
|
|
return source;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 维度成员数据扩展 转map
|
|
|
* @param cubeMemberMappingModel
|
|
|
* @param cloumnValue
|
|
|
* @param subRowKey
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String,Object> dimensionMemberDataExtendToMap(CubeMemberMappingModel cubeMemberMappingModel,String cloumnValue,String subRowKey,String profileId) throws Exception {
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String cloumnCode = cubeMemberMappingModel.getDimensionCode();
|
|
|
String parentCode = cubeMemberMappingModel.getParentCode();
|
|
|
String dataType = cubeMemberMappingModel.getDataType();
|
|
|
String dict = cubeMemberMappingModel.getDict();
|
|
|
String algorithm = cubeMemberMappingModel.getAlgorithm();
|
|
|
String algorithmParm = cubeMemberMappingModel.getParm();
|
|
|
int childSaveType = cubeMemberMappingModel.getChildSaveType();
|
|
|
String index = cubeMemberMappingModel.getIndexName();
|
|
|
String type = cubeMemberMappingModel.getIndexType();
|
|
|
if(childSaveType == 1 ){//对象方式
|
|
|
Map<String, Object> objChildMap = new HashMap<>();
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
objChildMap = extendDictData(objChildMap,cloumnCode,dict,cloumnValue);
|
|
|
source.put(parentCode,objChildMap);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//区域算法
|
|
|
if(algorithm.equals("DivisionFunc") && StringUtils.isNotEmpty(algorithmParm)){
|
|
|
objChildMap = extendDivisionData(objChildMap, cloumnCode, algorithmParm, cloumnValue);
|
|
|
}
|
|
|
source.put(parentCode,objChildMap);
|
|
|
//其他算法 统一改为反射的方式 计算 TODO
|
|
|
}else {
|
|
|
objChildMap.put(cloumnCode,cloumnValue);
|
|
|
}
|
|
|
source.put(parentCode,objChildMap);
|
|
|
}else if(childSaveType == 2 ){//nested 方式
|
|
|
//查找子集主键字段
|
|
|
CubeMemberMapping primaryCubeMember = cubeMemberMappingService.findCubeMemberMappingPrimary(cubeMemberMappingModel.getCubeMappingId());
|
|
|
String primaryKeyCode = "";
|
|
|
if(primaryCubeMember != null ){
|
|
|
primaryKeyCode = primaryCubeMember.getDimensionCode();
|
|
|
}else {
|
|
|
return null;
|
|
|
}
|
|
|
List<Map<String,Object>> nestedList = new ArrayList<>();
|
|
|
//查出历史数据 然后组合保存
|
|
|
Map<String, Object> oldMataMap = elasticSearchUtil.findById(index, type, profileId);
|
|
|
if(oldMataMap != null && oldMataMap.size() > 0){
|
|
|
//组装 子集历史数据,更改当前字段值 在添加
|
|
|
List<Map<String, Object>> childList = (List<Map<String, Object>>)oldMataMap.get(parentCode);
|
|
|
if(childList != null && childList.size() > 0){
|
|
|
boolean isexist = false;
|
|
|
for(Map<String, Object> map : childList){
|
|
|
if(subRowKey.equals(map.get(primaryKeyCode).toString())){
|
|
|
map.put(primaryKeyCode,subRowKey);
|
|
|
map.put(cloumnCode,dataConver(dataType,cloumnValue));
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
map = extendDictData(map,cloumnCode,dict,cloumnValue);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//其他算法
|
|
|
}
|
|
|
isexist = true;
|
|
|
}
|
|
|
nestedList.add(map);
|
|
|
}
|
|
|
if( !isexist){
|
|
|
Map<String,Object> map = new HashMap<>();
|
|
|
map.put(primaryKeyCode,subRowKey);
|
|
|
map.put(cloumnCode,dataConver(dataType,cloumnValue));
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
map = extendDictData(map,cloumnCode,dict,cloumnValue);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//其他算法
|
|
|
}
|
|
|
nestedList.add(map);
|
|
|
}
|
|
|
source.put(parentCode, nestedList);
|
|
|
}else{
|
|
|
Map<String,Object> map = new HashMap<>();
|
|
|
map.put(primaryKeyCode,subRowKey);
|
|
|
map.put(cloumnCode,dataConver(dataType,cloumnValue));
|
|
|
nestedList.add(map);
|
|
|
source.put(parentCode, nestedList);
|
|
|
}
|
|
|
}else{
|
|
|
throw new Exception("没有找到数据,无法更新");
|
|
|
}
|
|
|
}
|
|
|
return source;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 字典数据扩展
|
|
|
* @param source 数据集合
|
|
|
* @param cloumnCode 列code
|
|
|
* @param dict 字典ID
|
|
|
* @param code 字典编码
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String, Object> extendDictData(Map<String, Object> source ,String cloumnCode,String dict,String code){
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
String value = "";
|
|
|
String param[] = {dict,code};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = cloumnCode + "测试值";
|
|
|
source.put(cloumnCode,code);
|
|
|
source.put(cloumnCode + "Name",value);
|
|
|
return source;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 年龄扩展年龄段 及年龄段字典
|
|
|
* @param source 数据集合
|
|
|
* @param cloumnCode 列code
|
|
|
* @param dict 字典ID
|
|
|
* @param sourceValue 年龄
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String, Object> extendAgeGroupData(Map<String, Object> source ,String cloumnCode,String dict,String sourceValue){
|
|
|
String value = "";
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
AgeGroupFunc ageGroupFunc = new AgeGroupFunc();
|
|
|
String ageGroup = ageGroupFunc.execute(Integer.valueOf(sourceValue));
|
|
|
String param[] = {dict,ageGroup};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = "年龄段=" + ageGroup;
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
source.put(cloumnCode,ageGroup);
|
|
|
source.put(cloumnCode + "Name",value);
|
|
|
}
|
|
|
return source;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 区域扩展
|
|
|
* @param source 数据集合
|
|
|
* @param cloumnCode 列code
|
|
|
* @param algorithmParm 算法参数
|
|
|
* @param sourceValue 年龄
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String, Object> extendDivisionData(Map<String, Object> source ,String cloumnCode,String algorithmParm,String sourceValue){
|
|
|
DivisionFunc divisionFunc = new DivisionFunc();
|
|
|
String level = "1";
|
|
|
String divisionVal = "";
|
|
|
if(cloumnCode.toLowerCase().equals("town")){
|
|
|
level = "1";
|
|
|
divisionVal = "信州区";
|
|
|
}else if(cloumnCode.toLowerCase().equals("city")){
|
|
|
level = "2";
|
|
|
divisionVal = "上饶市";
|
|
|
}else if(cloumnCode.toLowerCase().equals("province")){
|
|
|
level = "3";
|
|
|
divisionVal = "江西省";
|
|
|
}
|
|
|
String divisionParam[] = {sourceValue,level};
|
|
|
// divisionVal = divisionFunc.execute(divisionParam);
|
|
|
source.put(cloumnCode,divisionVal);
|
|
|
return source;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 添加修改 数据
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param rowKey
|
|
|
* @param source
|
|
|
* @throws ParseException
|
|
|
*/
|
|
|
public void saveElasticSearchData(String index,String type,String rowKey,Map<String, Object> source) throws ParseException {
|
|
|
Map<String, Object> data = elasticSearchUtil.findById(index, type, rowKey);
|
|
|
if(data != null){
|
|
|
elasticSearchUtil.update(index, type,rowKey,source);
|
|
|
}else {
|
|
|
elasticSearchUtil.index(index, type,source);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* @param dataMap
|
|
|
*/
|
|
|
public Map<String, Object> mysqlDataProcess(Map<String, Object> dataMap){
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String table = dataMap.get(table_k).toString();
|
|
|
String rowKey = dataMap.get(rowKey_k).toString();
|
|
|
|
|
|
|
|
|
//处理 数据库执行动作 增删改
|
|
|
//TODO
|
|
|
|
|
|
return source;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 数据类型转换
|
|
|
* @param dataType
|
|
|
* @param keyValue
|
|
|
*/
|
|
|
public Object dataConver(String dataType,String keyValue){
|
|
|
NumberFormat nf = NumberFormat.getInstance();
|
|
|
Object value = null;
|
|
|
dataType = dataType.toLowerCase();
|
|
|
if(dataType.equals("string")){
|
|
|
value = keyValue;
|
|
|
}else if(dataType.equals("int")){
|
|
|
int intValue = Integer.valueOf(keyValue);
|
|
|
value = intValue;
|
|
|
}else if(dataType.equals("double")){
|
|
|
nf.setGroupingUsed(false);
|
|
|
nf.setMaximumFractionDigits(2);
|
|
|
double doubleValue = Double.valueOf(keyValue);
|
|
|
value = doubleValue;
|
|
|
}else if(dataType.equals("date")){
|
|
|
Date dateValue = DateUtil.formatCharDateYMDHMS(keyValue);
|
|
|
value = dateValue;
|
|
|
}
|
|
|
return value;
|
|
|
}
|
|
|
|
|
|
}
|