|
@ -1,20 +1,15 @@
|
|
|
package com.yihu.quota.service.cube;
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.google.gson.Gson;
|
|
|
import com.yihu.ehr.elasticsearch.ElasticSearchPool;
|
|
|
import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
|
|
|
import com.yihu.ehr.util.datetime.DateUtil;
|
|
|
import com.yihu.quota.etl.formula.AgeGroupFunc;
|
|
|
import com.yihu.quota.etl.formula.DictFunc;
|
|
|
import com.yihu.quota.etl.formula.DivisionFunc;
|
|
|
import com.yihu.quota.etl.formula.FuncHelper;
|
|
|
import com.yihu.quota.model.cube.Cube;
|
|
|
import com.yihu.quota.model.cube.CubeMemberMapping;
|
|
|
import com.yihu.quota.util.ElasticSearchHandler;
|
|
|
import com.yihu.quota.vo.CubeMappingModel;
|
|
|
import com.yihu.quota.vo.CubeMemberMappingModel;
|
|
|
import net.bytebuddy.implementation.bytecode.Throw;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@ -37,6 +32,7 @@ public class ElasticSearchDataProcessService {
|
|
|
private static String action_put = "Put"; //添加和修改单个字段值
|
|
|
|
|
|
private static String dataSource_k = "dataSource";
|
|
|
private static String database_k = "database";
|
|
|
private static String table_k = "table";
|
|
|
private static String id_k = "_id";
|
|
|
private static String rowKey_k = "rowkey";
|
|
@ -44,7 +40,7 @@ public class ElasticSearchDataProcessService {
|
|
|
private static String action_k = "action";
|
|
|
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
private JdbcBasicService jdbcBasicService;
|
|
|
@Autowired
|
|
|
private CubeMappingService cubeMappingService;
|
|
|
@Autowired
|
|
@ -64,10 +60,9 @@ public class ElasticSearchDataProcessService {
|
|
|
String dataSource = dataMap.get(dataSource_k).toString();
|
|
|
dataMap.remove(dataSource_k);
|
|
|
if(dataSource.toLowerCase().equals(dataSource_hbase)){
|
|
|
hbaseDataProcess(dataMap);
|
|
|
dataProcess(dataMap);
|
|
|
}else if(dataSource.toLowerCase().equals(dataSource_mysql)){
|
|
|
// mysqlDataProcess(dataMap);
|
|
|
hbaseDataProcess(dataMap);
|
|
|
dataProcess(dataMap);
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
@ -81,15 +76,15 @@ public class ElasticSearchDataProcessService {
|
|
|
* 如果是子集的数据 hbase 过来要指定父级数据
|
|
|
* 如果是 删除 子表中的一行数据,es 这边就无法确定删除哪个数据
|
|
|
*/
|
|
|
public void hbaseDataProcess(Map<String, Object> dataMap) throws Exception{
|
|
|
public void dataProcess(Map<String, Object> dataMap) throws Exception{
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String table = "";
|
|
|
String rowKey = "";
|
|
|
String profileId = "";
|
|
|
String action = "";
|
|
|
String database = "";
|
|
|
if (dataMap.containsKey("database")) {
|
|
|
database = dataMap.remove("database").toString();
|
|
|
if (dataMap.containsKey(database_k)) {
|
|
|
database = dataMap.remove(database_k).toString();
|
|
|
}
|
|
|
if(dataMap.containsKey(table_k)){
|
|
|
table = dataMap.get(table_k).toString();
|
|
@ -109,24 +104,20 @@ public class ElasticSearchDataProcessService {
|
|
|
dataMap.remove(table_k);
|
|
|
dataMap.remove(rowKey_k);
|
|
|
dataMap.remove(action_k);
|
|
|
String keyValue = "";
|
|
|
String baseCloumnValue = "";
|
|
|
if(action.contains(action_put)){
|
|
|
for(String hbaseColCode : dataMap.keySet()){
|
|
|
if(dataMap.get(hbaseColCode)!= null){
|
|
|
keyValue = dataMap.get(hbaseColCode).toString();
|
|
|
for(String baseCloumnCode : dataMap.keySet()){
|
|
|
if(dataMap.get(baseCloumnCode)!= null){
|
|
|
baseCloumnValue = dataMap.get(baseCloumnCode).toString();
|
|
|
}
|
|
|
List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, hbaseColCode);
|
|
|
List<CubeMappingModel> cubeMappingModels = cubeMappingService.findCubeMappingModelsByFieldCode(table, baseCloumnCode);
|
|
|
if(cubeMappingModels != null && cubeMappingModels.size() > 0){
|
|
|
for(CubeMappingModel cubeMappingModel :cubeMappingModels){
|
|
|
String index = cubeMappingModel.getIndexName();
|
|
|
String type = cubeMappingModel.getIndexType();
|
|
|
String cloumnCode = cubeMappingModel.getDimensionCode();
|
|
|
String dataType = cubeMappingModel.getDataType();
|
|
|
String dict = cubeMappingModel.getDict();
|
|
|
String algorithm = cubeMappingModel.getAlgorithm();
|
|
|
String algorithmParm = cubeMappingModel.getParm();
|
|
|
//维度数据扩展保存
|
|
|
Map<String, Object> esDataMap = dimensionDataExtendToMap(keyValue, cloumnCode, dataType, dict, algorithm, algorithmParm);
|
|
|
Map<String, Object> esDataMap = dimensionDataExtendToMap(cubeMappingModel,cloumnCode,baseCloumnValue);
|
|
|
source.putAll(esDataMap);
|
|
|
saveElasticSearchData(index, type,rowKey,source);
|
|
|
}
|
|
@ -136,7 +127,7 @@ public class ElasticSearchDataProcessService {
|
|
|
profileId = rowKey;
|
|
|
}
|
|
|
// 是否是子集属性
|
|
|
List<CubeMemberMappingModel> cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,hbaseColCode);
|
|
|
List<CubeMemberMappingModel> cubeMemberMappingModels = cubeMemberMappingService.findCubeMemberMappingModels(table,baseCloumnCode);
|
|
|
if(cubeMemberMappingModels != null && cubeMemberMappingModels.size() > 0){
|
|
|
for(CubeMemberMappingModel cubeMemberMappingModel :cubeMemberMappingModels){
|
|
|
String index = cubeMemberMappingModel.getIndexName();
|
|
@ -144,11 +135,45 @@ public class ElasticSearchDataProcessService {
|
|
|
String subRowKey = rowKey;
|
|
|
if(action.contains(action_put)){
|
|
|
//维度成员数据扩展保存
|
|
|
source = dimensionMemberDataExtendToMap(cubeMemberMappingModel,keyValue,subRowKey,profileId);
|
|
|
source = dimensionMemberDataExtendToMap(cubeMemberMappingModel,baseCloumnValue,subRowKey,profileId);
|
|
|
saveElasticSearchData(index, type,profileId,source);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
//是否为关联字段 start
|
|
|
List<CubeMappingModel> relationMappingModels = cubeMappingService.findRelationMappingModelsByFieldCode(table, baseCloumnCode);
|
|
|
if(relationMappingModels != null && relationMappingModels.size() > 0) {
|
|
|
for (CubeMappingModel cubeMappingModel : relationMappingModels) {
|
|
|
Object object = jdbcBasicService.getEntityByRelationId(database ,table, baseCloumnCode, cubeMappingModel.getRelationFieldId());
|
|
|
String value = converMapObject(object);
|
|
|
String index = cubeMappingModel.getIndexName();
|
|
|
String type = cubeMappingModel.getIndexType();
|
|
|
String cloumnCode = cubeMappingModel.getDimensionCode();
|
|
|
//维度数据扩展保存
|
|
|
Map<String, Object> esDataMap = dimensionDataExtendToMap(cubeMappingModel,cloumnCode,value);
|
|
|
source.putAll(esDataMap);
|
|
|
saveElasticSearchData(index, type,rowKey,source);
|
|
|
}
|
|
|
}
|
|
|
List<CubeMemberMappingModel> relationMemberMappingModels = cubeMemberMappingService.findRelationMemberMappingModels(table,baseCloumnCode);
|
|
|
if(relationMemberMappingModels != null && relationMemberMappingModels.size() > 0) {
|
|
|
for (CubeMemberMappingModel cubeMemberMappingModel : relationMemberMappingModels) {
|
|
|
//父级对象
|
|
|
if(cubeMemberMappingModel.getChildSaveType() == 1){
|
|
|
Object object = jdbcBasicService.getEntityByRelationId(database ,table, baseCloumnCode, cubeMemberMappingModel.getRelationFieldId());
|
|
|
String value = converMapObject(object);
|
|
|
String index = cubeMemberMappingModel.getIndexName();
|
|
|
String type = cubeMemberMappingModel.getIndexType();
|
|
|
String subRowKey = rowKey;
|
|
|
if(action.contains(action_put)){
|
|
|
//维度成员数据扩展保存
|
|
|
source = dimensionMemberDataExtendToMap(cubeMemberMappingModel,value,subRowKey,profileId);
|
|
|
saveElasticSearchData(index, type,profileId,source);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
//是否为关联字段 end
|
|
|
}
|
|
|
}else if(action.contains(action_del)){
|
|
|
//一个表只能对应到一个 索引type
|
|
@ -171,20 +196,18 @@ public class ElasticSearchDataProcessService {
|
|
|
|
|
|
/**
|
|
|
* 维度数据扩展 转map
|
|
|
* @param cloumnValue
|
|
|
* @param cubeMappingModel
|
|
|
* @param cloumnCode
|
|
|
* @param dataType
|
|
|
* @param dict
|
|
|
* @param algorithm
|
|
|
* @param algorithmParm 格式如 维度值,如果需要后端获取后代替用value,每个参数直接用逗号分隔,像第一个和第二个参数用逗号,分开,如: value,2,3
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String,Object> dimensionDataExtendToMap(String cloumnValue,String cloumnCode,String dataType ,
|
|
|
String dict,String algorithm,String algorithmParm ) throws Exception {
|
|
|
public Map<String,Object> dimensionDataExtendToMap(CubeMappingModel cubeMappingModel,String cloumnCode,String baseCloumnValue) throws Exception {
|
|
|
String dataType = cubeMappingModel.getDataType();
|
|
|
String dict = cubeMappingModel.getDict();
|
|
|
String algorithm = cubeMappingModel.getAlgorithm();
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(dict) && StringUtils.isEmpty(algorithm)){
|
|
|
source = extendDictData(source,cloumnCode,dict,cloumnValue);
|
|
|
source = extendDictData(source,cloumnCode,dict,baseCloumnValue);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
// //通过 反射方式进行 后续开放 暂时字典redis不通
|
|
|
// FuncHelper funcHelper = new FuncHelper();
|
|
@ -197,10 +220,10 @@ public class ElasticSearchDataProcessService {
|
|
|
// }
|
|
|
//年龄段
|
|
|
if(algorithm.contains("AgeGroupFunc")){
|
|
|
source = extendAgeGroupData(source,cloumnCode,dict,cloumnValue);
|
|
|
source = extendAgeGroupData(source,cloumnCode,dict,baseCloumnValue);
|
|
|
}
|
|
|
}else {
|
|
|
source.put(cloumnCode,dataConver(dataType,cloumnValue));
|
|
|
source.put(cloumnCode,dataConver(dataType,baseCloumnValue));
|
|
|
}
|
|
|
return source;
|
|
|
}
|
|
@ -208,11 +231,11 @@ public class ElasticSearchDataProcessService {
|
|
|
/**
|
|
|
* 维度成员数据扩展 转map
|
|
|
* @param cubeMemberMappingModel
|
|
|
* @param cloumnValue
|
|
|
* @param baseCloumnValue
|
|
|
* @param subRowKey
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String,Object> dimensionMemberDataExtendToMap(CubeMemberMappingModel cubeMemberMappingModel,String cloumnValue,String subRowKey,String profileId) throws Exception {
|
|
|
public Map<String,Object> dimensionMemberDataExtendToMap(CubeMemberMappingModel cubeMemberMappingModel,String baseCloumnValue,String subRowKey,String profileId) throws Exception {
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String cloumnCode = cubeMemberMappingModel.getDimensionCode();
|
|
|
String parentCode = cubeMemberMappingModel.getParentCode();
|
|
@ -226,7 +249,7 @@ public class ElasticSearchDataProcessService {
|
|
|
if(childSaveType == 1 ){//对象方式
|
|
|
Map<String, Object> objChildMap = new HashMap<>();
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
objChildMap = extendDictData(objChildMap,cloumnCode,dict,cloumnValue);
|
|
|
objChildMap = extendDictData(objChildMap,cloumnCode,dict,baseCloumnValue);
|
|
|
source.put(parentCode,objChildMap);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
|
|
@ -241,12 +264,12 @@ public class ElasticSearchDataProcessService {
|
|
|
// }
|
|
|
//区域算法
|
|
|
if(algorithm.contains("DivisionFunc") && StringUtils.isNotEmpty(algorithmParm)){
|
|
|
objChildMap = extendDivisionData(objChildMap, cloumnCode, algorithmParm, cloumnValue);
|
|
|
objChildMap = extendDivisionData(objChildMap, cloumnCode, algorithmParm, baseCloumnValue);
|
|
|
}
|
|
|
source.put(parentCode,objChildMap);
|
|
|
//其他算法 统一改为反射的方式 计算 TODO
|
|
|
}else {
|
|
|
objChildMap.put(cloumnCode,cloumnValue);
|
|
|
objChildMap.put(cloumnCode,baseCloumnValue);
|
|
|
}
|
|
|
source.put(parentCode,objChildMap);
|
|
|
}else if(childSaveType == 2 ){//nested 方式
|
|
@ -269,9 +292,9 @@ public class ElasticSearchDataProcessService {
|
|
|
for(Map<String, Object> map : childList){
|
|
|
if(subRowKey.equals(map.get(primaryKeyCode).toString())){
|
|
|
map.put(primaryKeyCode,subRowKey);
|
|
|
map.put(cloumnCode,dataConver(dataType,cloumnValue));
|
|
|
map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
map = extendDictData(map,cloumnCode,dict,cloumnValue);
|
|
|
map = extendDictData(map,cloumnCode,dict,baseCloumnValue);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//其他算法
|
|
|
}
|
|
@ -282,9 +305,9 @@ public class ElasticSearchDataProcessService {
|
|
|
if( !isexist){
|
|
|
Map<String,Object> map = new HashMap<>();
|
|
|
map.put(primaryKeyCode,subRowKey);
|
|
|
map.put(cloumnCode,dataConver(dataType,cloumnValue));
|
|
|
map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
map = extendDictData(map,cloumnCode,dict,cloumnValue);
|
|
|
map = extendDictData(map,cloumnCode,dict,baseCloumnValue);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//其他算法
|
|
|
}
|
|
@ -294,7 +317,7 @@ public class ElasticSearchDataProcessService {
|
|
|
}else{
|
|
|
Map<String,Object> map = new HashMap<>();
|
|
|
map.put(primaryKeyCode,subRowKey);
|
|
|
map.put(cloumnCode,dataConver(dataType,cloumnValue));
|
|
|
map.put(cloumnCode,dataConver(dataType,baseCloumnValue));
|
|
|
nestedList.add(map);
|
|
|
source.put(parentCode, nestedList);
|
|
|
}
|
|
@ -395,22 +418,6 @@ public class ElasticSearchDataProcessService {
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* @param dataMap
|
|
|
*/
|
|
|
public Map<String, Object> mysqlDataProcess(Map<String, Object> dataMap){
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String table = dataMap.get(table_k).toString();
|
|
|
String rowKey = dataMap.get(rowKey_k).toString();
|
|
|
|
|
|
|
|
|
//处理 数据库执行动作 增删改
|
|
|
//TODO
|
|
|
|
|
|
return source;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 数据类型转换
|
|
|
* @param dataType
|
|
@ -437,4 +444,12 @@ public class ElasticSearchDataProcessService {
|
|
|
return value;
|
|
|
}
|
|
|
|
|
|
public String converMapObject(Object object){
|
|
|
Object[] obj = (Object[]) object;
|
|
|
if(obj.length > 0){
|
|
|
return obj[0].toString();
|
|
|
}
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
}
|