|
@ -11,6 +11,7 @@ import com.yihu.quota.etl.formula.DivisionFunc;
|
|
|
import com.yihu.quota.util.ElasticSearchHandler;
|
|
|
import com.yihu.quota.vo.CubeMappingModel;
|
|
|
import com.yihu.quota.vo.CubeMemberMappingModel;
|
|
|
import net.bytebuddy.implementation.bytecode.Throw;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@ -74,7 +75,8 @@ public class ElasticSearchDataProcessService {
|
|
|
|
|
|
/**
|
|
|
* @param dataMap
|
|
|
* //如果是子集的数据 hbase 过来要指定父级数据
|
|
|
* 如果是子集的数据 hbase 过来要指定父级数据
|
|
|
* 如果是 删除 子表中的一行数据,es 这边就无法确定删除哪个数据
|
|
|
*/
|
|
|
public void hbaseDataProcess(Map<String, Object> dataMap) throws Exception{
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
@ -96,9 +98,6 @@ public class ElasticSearchDataProcessService {
|
|
|
if(dataMap.containsKey(action_k)){
|
|
|
action = dataMap.get(action_k).toString();
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
dataMap.remove(table_k);
|
|
|
dataMap.remove(rowKey_k);
|
|
@ -136,16 +135,12 @@ public class ElasticSearchDataProcessService {
|
|
|
String type = cubeMemberMappingModel.getIndexType();
|
|
|
if(action.contains(action_put)){
|
|
|
//维度成员数据扩展保存
|
|
|
source = dimensionMemberDataExtendToMap(cubeMemberMappingModel,hbaseColCode,keyValue,rowKey);
|
|
|
source = dimensionMemberDataExtendToMap(cubeMemberMappingModel,keyValue,rowKey,profileId);
|
|
|
saveElasticSearchData(index, type,profileId,source);
|
|
|
}else if(action.contains(action_del)){
|
|
|
elasticSearchUtil.delete(index,type,profileId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
}catch (ParseException e){
|
|
|
logger.debug("elasticSearch 执行失败");
|
|
|
e.printStackTrace();
|
|
@ -158,7 +153,7 @@ public class ElasticSearchDataProcessService {
|
|
|
|
|
|
/**
|
|
|
* 维度数据扩展 转map
|
|
|
* @param sourceValue
|
|
|
* @param cloumnValue
|
|
|
* @param cloumnCode
|
|
|
* @param dataType
|
|
|
* @param dict
|
|
@ -166,27 +161,27 @@ public class ElasticSearchDataProcessService {
|
|
|
* @param algorithmParm
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String,Object> dimensionDataExtendToMap(String sourceValue,String cloumnCode,String dataType ,String dict,String algorithm,String algorithmParm ){
|
|
|
public Map<String,Object> dimensionDataExtendToMap(String cloumnValue,String cloumnCode,String dataType ,String dict,String algorithm,String algorithmParm ){
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(dict) && StringUtils.isEmpty(algorithm)){
|
|
|
source = extendDictData(source,cloumnCode,dict,sourceValue);
|
|
|
source = extendDictData(source,cloumnCode,dict,cloumnValue);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//年龄段
|
|
|
if(algorithm.equals("AgeGroupFunc")){
|
|
|
source = extendAgeGroupData(source,cloumnCode,dict,sourceValue);
|
|
|
source = extendAgeGroupData(source,cloumnCode,dict,cloumnValue);
|
|
|
}
|
|
|
//区域算法
|
|
|
if(algorithm.equals("DivisionFunc") && StringUtils.isNotEmpty(algorithmParm)){
|
|
|
source = extendDivisionData(source, cloumnCode, algorithmParm, sourceValue);
|
|
|
source = extendDivisionData(source, cloumnCode, algorithmParm, cloumnValue);
|
|
|
}
|
|
|
//其他算法 --
|
|
|
}else {
|
|
|
if(StringUtils.isNotEmpty(dataType)){
|
|
|
Object value = dataConver(dataType,sourceValue);
|
|
|
Object value = dataConver(dataType,cloumnValue);
|
|
|
source.put(cloumnCode,value);
|
|
|
}else{
|
|
|
source.put(cloumnCode,sourceValue);
|
|
|
source.put(cloumnCode,cloumnValue);
|
|
|
}
|
|
|
}
|
|
|
return source;
|
|
@ -195,12 +190,11 @@ public class ElasticSearchDataProcessService {
|
|
|
/**
|
|
|
* 维度成员数据扩展 转map
|
|
|
* @param cubeMemberMappingModel
|
|
|
* @param key
|
|
|
* @param sourceValue
|
|
|
* @param rowKey
|
|
|
* @param cloumnValue
|
|
|
* @param subRowKey
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String,Object> dimensionMemberDataExtendToMap(CubeMemberMappingModel cubeMemberMappingModel ,String key,String sourceValue,String rowKey) {
|
|
|
public Map<String,Object> dimensionMemberDataExtendToMap(CubeMemberMappingModel cubeMemberMappingModel,String cloumnValue,String subRowKey,String profileId) throws Exception {
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String cloumnCode = cubeMemberMappingModel.getDimensionCode();
|
|
|
String parentCode = cubeMemberMappingModel.getParentCode();
|
|
@ -214,18 +208,18 @@ public class ElasticSearchDataProcessService {
|
|
|
|
|
|
if(childSaveType == 1 ){//对象方式
|
|
|
Map<String, Object> objChildMap = new HashMap<>();
|
|
|
objChildMap.put(cloumnCode,sourceValue);
|
|
|
objChildMap.put(cloumnCode,cloumnValue);
|
|
|
if(StringUtils.isNotEmpty(dict)){
|
|
|
objChildMap = extendDictData(objChildMap,cloumnCode,dict,sourceValue);
|
|
|
objChildMap = extendDictData(objChildMap,cloumnCode,dict,cloumnValue);
|
|
|
source.put(parentCode,objChildMap);
|
|
|
}else if(StringUtils.isNotEmpty(algorithm)){
|
|
|
//年龄段
|
|
|
if(algorithm.equals("AgeGroupFunc")){
|
|
|
objChildMap = extendAgeGroupData(objChildMap,cloumnCode,dict,sourceValue);
|
|
|
objChildMap = extendAgeGroupData(objChildMap,cloumnCode,dict,cloumnValue);
|
|
|
}
|
|
|
//区域算法
|
|
|
if(algorithm.equals("DivisionFunc") && StringUtils.isNotEmpty(algorithmParm)){
|
|
|
objChildMap = extendDivisionData(objChildMap, cloumnCode, algorithmParm, sourceValue);
|
|
|
objChildMap = extendDivisionData(objChildMap, cloumnCode, algorithmParm, cloumnValue);
|
|
|
}
|
|
|
source.put(parentCode,objChildMap);
|
|
|
//其他算法 --
|
|
@ -234,39 +228,39 @@ public class ElasticSearchDataProcessService {
|
|
|
}else if(childSaveType == 2 ){//nested 方式
|
|
|
List<Map<String,Object>> nestedList = new ArrayList<>();
|
|
|
//查出历史数据 然后组合保存
|
|
|
String field = parentCode + "." + subRowKey_k;
|
|
|
List<Map<String, Object>> subDataList = elasticSearchUtil.findByField(index, type, field, rowKey);
|
|
|
if(subDataList != null && subDataList.size() > 0){
|
|
|
String parentRowkey = subDataList.get(0).get(rowKey_k).toString();
|
|
|
List<Map<String, Object>> dataList = elasticSearchUtil.findByField(index, type, rowKey_k, parentRowkey);
|
|
|
if(dataList != null && dataList.size() > 0){
|
|
|
//组装 子集历史数据,更改当前字段值 在添加
|
|
|
for(Map<String, Object> map : dataList){
|
|
|
if(map.get(subRowKey_k).equals(rowKey)){
|
|
|
for(String colKey :map.keySet()){
|
|
|
if(colKey.equals(cloumnCode)){
|
|
|
map.put(key,sourceValue);
|
|
|
Map<String, Object> oldMataMap = elasticSearchUtil.findById(index, type, profileId);
|
|
|
if(oldMataMap != null && oldMataMap.size() > 0){
|
|
|
//组装 子集历史数据,更改当前字段值 在添加
|
|
|
List<Map<String, Object>> childList = (List<Map<String, Object>>)oldMataMap.get(parentCode);
|
|
|
if(childList != null && childList.size() > 0){
|
|
|
for(Map<String, Object> map : childList){
|
|
|
if(oldMataMap.get(subRowKey_k) != null){
|
|
|
if(oldMataMap.get(subRowKey_k).toString().equals(subRowKey)){
|
|
|
for(String colKey :map.keySet()) {
|
|
|
if (colKey.equals(cloumnCode)) {
|
|
|
map.put(cloumnCode, cloumnValue);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
nestedList.add(map);
|
|
|
}else{
|
|
|
nestedList.add(map);
|
|
|
}
|
|
|
nestedList.add(map);
|
|
|
}
|
|
|
source.put(parentCode, nestedList);
|
|
|
}else{
|
|
|
Map<String,Object> map = new HashMap<>();
|
|
|
map.put(subRowKey_k,subRowKey);
|
|
|
if(StringUtils.isNotEmpty(dataType)){
|
|
|
Object value = dataConver(dataType,cloumnValue);
|
|
|
map.put(cloumnCode,value);
|
|
|
}else{
|
|
|
map.put(cloumnCode,cloumnValue);
|
|
|
}
|
|
|
}else {
|
|
|
//库中没有记录,单条添加
|
|
|
Map<String, Object> nestedChildMap = new HashMap<>();
|
|
|
nestedChildMap.put(cloumnCode,sourceValue);
|
|
|
nestedChildMap.put(subRowKey_k,rowKey);
|
|
|
nestedList.add(nestedChildMap);
|
|
|
nestedList.add(map);
|
|
|
source.put(parentCode, nestedList);
|
|
|
}
|
|
|
}else{
|
|
|
Map<String, Object> map = new HashMap<>();
|
|
|
map.put("subRowkey",rowKey);
|
|
|
map.put(cloumnCode,sourceValue);
|
|
|
nestedList.add(map);
|
|
|
throw new Exception("没有找到数据,无法更新");
|
|
|
}
|
|
|
source.put(parentCode,nestedList);
|
|
|
}
|
|
|
return source;
|
|
|
}
|