|
@ -29,8 +29,7 @@ public class ElasticSearchDataProcessService {
|
|
|
private static Logger logger = LoggerFactory.getLogger(ElasticSearchDataProcessService.class);
|
|
|
private static String dataSource_hbase = "hbase";
|
|
|
private static String dataSource_mysql = "mysql";
|
|
|
private static String action_put = "Put";//添加和修改单个字段值
|
|
|
private static String action_del = "DeleteColumn";//删除单个字段值
|
|
|
private static String action_put = "Put"; //添加和修改单个字段值
|
|
|
private static String action_delFamily = "DeleteFamily";//删除整行
|
|
|
|
|
|
private static String dataSource_k = "dataSource";
|
|
@ -85,6 +84,8 @@ public class ElasticSearchDataProcessService {
|
|
|
//通过表找到 对应的数据集 保存的索引和type
|
|
|
//TODO 可以维护到数据字典 - 保存到redis 减少去数据库里面查询
|
|
|
|
|
|
//如果是子集的数据 hbase 过来要指定父级数据
|
|
|
|
|
|
String rowKey = dataMap.get(rowKey_k).toString();
|
|
|
String action = dataMap.get(action_k).toString();
|
|
|
dataMap.remove(table_k);
|
|
@ -104,39 +105,12 @@ public class ElasticSearchDataProcessService {
|
|
|
if(cubeMappingModels != null && cubeMappingModels.size() > 0){
|
|
|
for(CubeMappingModel cubeMappingModel :cubeMappingModels){
|
|
|
String cloumnCode = cubeMappingModel.getDimensionCode();
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(cubeMappingModel.getDict()) && StringUtils.isEmpty(cubeMappingModel.getAlgorithm())){
|
|
|
String value = "";
|
|
|
String param[] = {cubeMappingModel.getDict(),keyValue};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = key + "测试值";
|
|
|
source.put(cloumnCode,keyValue);
|
|
|
source.put(cloumnCode + "Name",value);
|
|
|
}else if(StringUtils.isNotEmpty(cubeMappingModel.getAlgorithm())){
|
|
|
//计算后 又经过字典 如:年龄段
|
|
|
if(cubeMappingModel.getAlgorithm().equals("AgeGroupFunc") && StringUtils.isNotEmpty(cubeMappingModel.getParm())){
|
|
|
String value = "";
|
|
|
AgeGroupFunc ageGroupFunc = new AgeGroupFunc();
|
|
|
String ageGroup = ageGroupFunc.execute(Integer.valueOf(keyValue));
|
|
|
String param[] = {cubeMappingModel.getDict(),ageGroup};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = "年龄段=" + ageGroup;
|
|
|
if(StringUtils.isNotEmpty(cubeMappingModel.getDict())){
|
|
|
source.put(cloumnCode,ageGroup);
|
|
|
source.put(cloumnCode + "Name",value);
|
|
|
}
|
|
|
}
|
|
|
//其他具体维度 对应具体算法
|
|
|
}else {
|
|
|
if(StringUtils.isNotEmpty(cubeMappingModel.getDataType())){
|
|
|
String dataType = cubeMappingModel.getDataType();
|
|
|
Object value = dataConver(dataType,keyValue);
|
|
|
source.put(cloumnCode,value);
|
|
|
}else{
|
|
|
source.put(cloumnCode,dataMap.get(key));
|
|
|
}
|
|
|
}
|
|
|
String dataType = cubeMappingModel.getDataType();
|
|
|
String dict = cubeMappingModel.getDict();
|
|
|
String algorithm = cubeMappingModel.getAlgorithm();
|
|
|
String algorithmParm = cubeMappingModel.getParm();
|
|
|
Map<String, Object> esDataMap = dataToMap(keyValue,cloumnCode,dataType ,dict,algorithm,algorithmParm);
|
|
|
source.putAll(esDataMap);
|
|
|
}
|
|
|
}
|
|
|
// 是否是 子集属性
|
|
@ -150,9 +124,41 @@ public class ElasticSearchDataProcessService {
|
|
|
if(childSaveType == 1 ){//对象方式
|
|
|
Map<String, Object> objChildMap = new HashMap<>();
|
|
|
objChildMap.put(cloumnCode,keyValue);
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDict()) && StringUtils.isEmpty(cubeMemberMappingModel.getAlgorithm())){
|
|
|
String value = "";
|
|
|
String param[] = {cubeMemberMappingModel.getDict(),keyValue};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = key + "测试值";
|
|
|
source.put(cloumnCode,keyValue);
|
|
|
objChildMap.put(cloumnCode + "Name",value);
|
|
|
|
|
|
}else if(StringUtils.isNotEmpty(cubeMemberMappingModel.getAlgorithm())){
|
|
|
if(cubeMemberMappingModel.getAlgorithm().equals("DivisionFunc") && StringUtils.isNotEmpty(cubeMemberMappingModel.getParm())){
|
|
|
if(cubeMemberMappingModel.getParm().equals(key)){
|
|
|
DivisionFunc divisionFunc = new DivisionFunc();
|
|
|
String level = "1";
|
|
|
String divisionVal = "";
|
|
|
if(cloumnCode.toLowerCase().equals("town")){
|
|
|
level = "1";
|
|
|
divisionVal = "信州区";
|
|
|
}else if(cloumnCode.toLowerCase().equals("city")){
|
|
|
level = "2";
|
|
|
divisionVal = "上饶市";
|
|
|
}else if(cloumnCode.toLowerCase().equals("province")){
|
|
|
level = "3";
|
|
|
divisionVal = "江西省";
|
|
|
}
|
|
|
String divisionParam[] = {dataMap.get(key).toString(),level};
|
|
|
// divisionVal = divisionFunc.execute(divisionParam);
|
|
|
objChildMap.put(cloumnCode,divisionVal);
|
|
|
}
|
|
|
}
|
|
|
//其他算法 --
|
|
|
}
|
|
|
source.put(parentCode,objChildMap);
|
|
|
}
|
|
|
if(childSaveType == 2 ){//nested 方式
|
|
|
saveElasticSearchData(index, type,rowKey,source);
|
|
|
}else if(childSaveType == 2 ){//nested 方式
|
|
|
List<Map<String,Object>> nestedList = new ArrayList<>();
|
|
|
//查出历史数据 然后组合保存
|
|
|
String field = parentCode + "." + subRowKey_k;
|
|
@ -183,32 +189,19 @@ public class ElasticSearchDataProcessService {
|
|
|
}
|
|
|
}
|
|
|
source.put(parentCode,nestedList);
|
|
|
saveElasticSearchData(index, type,rowKey,source);
|
|
|
}
|
|
|
}else {
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDict())){
|
|
|
if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDict()) && StringUtils.isEmpty(cubeMemberMappingModel.getAlgorithm())){
|
|
|
String value = "";
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
String param[] = {cubeMemberMappingModel.getDict(),cloumnCode};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = key + "测试值";
|
|
|
source.put(cloumnCode + "Name",value);
|
|
|
String param[] = {cubeMemberMappingModel.getDict(),keyValue};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = cloumnCode + "测试值";
|
|
|
source.put(cloumnCode,keyValue);
|
|
|
source.put(cloumnCode + "Name",value);
|
|
|
}else if(StringUtils.isNotEmpty(cubeMemberMappingModel.getAlgorithm())){
|
|
|
if(cubeMemberMappingModel.getAlgorithm().equals("DivisionFunc") && StringUtils.isNotEmpty(cubeMemberMappingModel.getParm())){
|
|
|
if(cubeMemberMappingModel.getParm().equals(key)){
|
|
|
DivisionFunc divisionFunc = new DivisionFunc();
|
|
|
String townParam[] = {dataMap.get(key).toString(),"1"};
|
|
|
String townVal = divisionFunc.execute(townParam);
|
|
|
String cityParam[] = {dataMap.get(key).toString(),"2"};
|
|
|
String cityVal = divisionFunc.execute(cityParam);
|
|
|
String provinceParam[] = {dataMap.get(key).toString(),"3"};
|
|
|
String provinceVal = divisionFunc.execute(provinceParam);
|
|
|
source.put("",townVal);
|
|
|
source.put("",cityVal);
|
|
|
source.put("",provinceVal);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}else {
|
|
|
if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDataType())){
|
|
|
String dataType = cubeMemberMappingModel.getDataType();
|
|
@ -219,17 +212,11 @@ public class ElasticSearchDataProcessService {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
saveElasticSearchData(index, type,rowKey,source);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}
|
|
|
elasticSearchUtil.update(index, type,rowKey, source);
|
|
|
}else if (action.contains(action_del)){
|
|
|
for(String key : dataMap.keySet()){
|
|
|
dataMap.put(key,"");
|
|
|
}
|
|
|
//保存数据
|
|
|
elasticSearchUtil.update(index, type, rowKey,dataMap);
|
|
|
|
|
|
}else if(action.contains(action_delFamily)){
|
|
|
elasticSearchUtil.delete(index,type,rowKey);
|
|
|
}
|
|
@ -244,6 +231,83 @@ public class ElasticSearchDataProcessService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public Map<String,Object> dataToMap(String sourceValue,String cloumnCode,String dataType ,String dict,String algorithm,String algorithmParm ){
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(dict) && StringUtils.isEmpty(algorithm)){
|
|
|
String value = "";
|
|
|
String param[] = {dict,sourceValue};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = cloumnCode + "测试值";
|
|
|
source.put(cloumnCode,sourceValue);
|
|
|
source.put(cloumnCode + "Name",value);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//计算后 又经过字典
|
|
|
// 如:年龄段
|
|
|
if(algorithm.equals("AgeGroupFunc") && StringUtils.isNotEmpty(algorithmParm)){
|
|
|
String value = "";
|
|
|
AgeGroupFunc ageGroupFunc = new AgeGroupFunc();
|
|
|
String ageGroup = ageGroupFunc.execute(Integer.valueOf(sourceValue));
|
|
|
String param[] = {dict,ageGroup};
|
|
|
// value = dictFunc.execute(param);
|
|
|
value = "年龄段=" + ageGroup;
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
source.put(cloumnCode,ageGroup);
|
|
|
source.put(cloumnCode + "Name",value);
|
|
|
}
|
|
|
}
|
|
|
//区域算法
|
|
|
if(algorithm.equals("DivisionFunc") && StringUtils.isNotEmpty(algorithmParm)){
|
|
|
if(algorithmParm.equals(cloumnCode)){
|
|
|
DivisionFunc divisionFunc = new DivisionFunc();
|
|
|
String level = "1";
|
|
|
String divisionVal = "";
|
|
|
if(cloumnCode.toLowerCase().equals("town")){
|
|
|
level = "1";
|
|
|
divisionVal = "信州区";
|
|
|
}else if(cloumnCode.toLowerCase().equals("city")){
|
|
|
level = "2";
|
|
|
divisionVal = "上饶市";
|
|
|
}else if(cloumnCode.toLowerCase().equals("province")){
|
|
|
level = "3";
|
|
|
divisionVal = "江西省";
|
|
|
}
|
|
|
String divisionParam[] = {sourceValue,level};
|
|
|
// divisionVal = divisionFunc.execute(divisionParam);
|
|
|
source.put(cloumnCode,divisionVal);
|
|
|
}
|
|
|
}
|
|
|
//其他算法 --
|
|
|
}else {
|
|
|
if(StringUtils.isNotEmpty(dataType)){
|
|
|
Object value = dataConver(dataType,sourceValue);
|
|
|
source.put(cloumnCode,value);
|
|
|
}else{
|
|
|
source.put(cloumnCode,sourceValue);
|
|
|
}
|
|
|
}
|
|
|
return source;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 添加修改 数据
|
|
|
* @param index
|
|
|
* @param type
|
|
|
* @param rowKey
|
|
|
* @param source
|
|
|
* @throws ParseException
|
|
|
*/
|
|
|
public void saveElasticSearchData(String index,String type,String rowKey,Map<String, Object> source) throws ParseException {
|
|
|
Map<String, Object> data = elasticSearchUtil.findById(index, type, rowKey);
|
|
|
if(data != null){
|
|
|
elasticSearchUtil.update(index, type,rowKey,source);
|
|
|
}else {
|
|
|
elasticSearchUtil.index(index, type,source);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* @param dataMap
|
|
|
*/
|