|
@ -5,6 +5,7 @@ import com.google.gson.Gson;
|
|
|
import com.yihu.ehr.elasticsearch.ElasticSearchPool;
|
|
|
import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
|
|
|
import com.yihu.ehr.util.datetime.DateUtil;
|
|
|
import com.yihu.quota.etl.formula.AgeGroupFunc;
|
|
|
import com.yihu.quota.etl.formula.DictFunc;
|
|
|
import com.yihu.quota.etl.formula.DivisionFunc;
|
|
|
import com.yihu.quota.util.ElasticSearchHandler;
|
|
@ -34,6 +35,12 @@ public class ElasticSearchDataProcessService {
|
|
|
private static String action_put = "Put";//添加和修改单个字段值
|
|
|
private static String action_del = "DeleteColumn";//删除单个字段值
|
|
|
private static String action_delFamily = "DeleteFamily";//删除整行
|
|
|
|
|
|
private static String dataSource_k = "dataSource";
|
|
|
private static String table_k = "table";
|
|
|
private static String rowKey_k = "rowkey";
|
|
|
private static String action_k = "action";
|
|
|
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
@Autowired
|
|
@ -44,8 +51,6 @@ public class ElasticSearchDataProcessService {
|
|
|
private ElasticSearchPool elasticSearchPool;
|
|
|
@Autowired
|
|
|
private ElasticSearchUtil elasticSearchUtil;
|
|
|
@Autowired
|
|
|
private ElasticSearchHandler elasticSearchHandler;
|
|
|
|
|
|
/**
|
|
|
*
|
|
@ -55,9 +60,9 @@ public class ElasticSearchDataProcessService {
|
|
|
try {
|
|
|
Gson gson = new Gson();
|
|
|
Map<String, Object> dataMap = gson.fromJson(data, Map.class);
|
|
|
if(dataMap.containsKey("dataSource")){
|
|
|
String dataSource = dataMap.get("dataSource").toString();
|
|
|
dataMap.remove("dataSource");
|
|
|
if(dataMap.containsKey(dataSource_k)){
|
|
|
String dataSource = dataMap.get(dataSource_k).toString();
|
|
|
dataMap.remove(dataSource_k);
|
|
|
if(dataSource.toLowerCase().equals(dataSource_hbase)){
|
|
|
hbaseDataProcess(dataMap);
|
|
|
}else if(dataSource.toLowerCase().equals(dataSource_mysql)){
|
|
@ -77,15 +82,15 @@ public class ElasticSearchDataProcessService {
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String index = "";
|
|
|
String type = "";
|
|
|
String table = dataMap.get("table").toString();
|
|
|
String table = dataMap.get(table_k).toString();
|
|
|
//通过表找到 对应的数据集 保存的索引和type
|
|
|
//TODO 可以维护到数据字典 - 保存到redis 减少去数据库里面查询
|
|
|
|
|
|
String rowKey = dataMap.get("rowKey").toString();
|
|
|
String action = dataMap.get("action").toString();
|
|
|
dataMap.remove("table");
|
|
|
dataMap.remove("rowKey");
|
|
|
dataMap.remove("action");
|
|
|
String rowKey = dataMap.get(rowKey_k).toString();
|
|
|
String action = dataMap.get(action_k).toString();
|
|
|
dataMap.remove(table_k);
|
|
|
dataMap.remove(rowKey_k);
|
|
|
dataMap.remove(action_k);
|
|
|
try {
|
|
|
if(action.contains(action_put)){
|
|
|
String keyValue = "";
|
|
@ -100,17 +105,30 @@ public class ElasticSearchDataProcessService {
|
|
|
if(cubeMappingModels != null && cubeMappingModels.size() > 0){
|
|
|
for(CubeMappingModel cubeMappingModel :cubeMappingModels){
|
|
|
String cloumnCode = cubeMappingModel.getDimensionCode();
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(cubeMappingModel.getDict())){
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
String param[] = {cubeMappingModel.getDict(),cloumnCode};
|
|
|
if(StringUtils.isNotEmpty(cubeMappingModel.getDict()) && StringUtils.isEmpty(cubeMappingModel.getAlgorithm())){
|
|
|
String param[] = {cubeMappingModel.getDict(),keyValue};
|
|
|
String value = dictFunc.execute(param);
|
|
|
String dictCode = cloumnCode + ".Code";
|
|
|
String dictName = cloumnCode + ".Name";
|
|
|
source.put(dictCode,key);
|
|
|
source.put(dictName,value);
|
|
|
}else if(StringUtils.isNotEmpty(cubeMappingModel.getAlgorithm())){
|
|
|
//具体维度 对应具体算法
|
|
|
//计算后 又经过字典 如:年龄段
|
|
|
if(cubeMappingModel.getAlgorithm().equals("AgeGroupFunc") && StringUtils.isNotEmpty(cubeMappingModel.getParm())){
|
|
|
AgeGroupFunc ageGroupFunc = new AgeGroupFunc();
|
|
|
String ageGroup = ageGroupFunc.execute(keyValue);
|
|
|
String param[] = {cubeMappingModel.getDict(),ageGroup};
|
|
|
String value = dictFunc.execute(param);
|
|
|
if(StringUtils.isNotEmpty(cubeMappingModel.getDict())){
|
|
|
String dictCode = cloumnCode + ".Code";
|
|
|
String dictName = cloumnCode + ".Name";
|
|
|
source.put(dictCode,key);
|
|
|
source.put(dictName,value);
|
|
|
}
|
|
|
}
|
|
|
//其他具体维度 对应具体算法
|
|
|
}else {
|
|
|
if(StringUtils.isNotEmpty(cubeMappingModel.getDataType())){
|
|
|
String dataType = cubeMappingModel.getDataType();
|
|
@ -130,45 +148,57 @@ public class ElasticSearchDataProcessService {
|
|
|
if(cubeMemberMappingModel.getChildSaveType() != null){
|
|
|
int childSaveType = cubeMemberMappingModel.getChildSaveType();
|
|
|
if(childSaveType == 1 ){//对象方式
|
|
|
|
|
|
//拼装成数据结构
|
|
|
String field = parentCode + "." + cloumnCode;
|
|
|
source.put(field,keyValue);
|
|
|
}
|
|
|
if(childSaveType == 2 ){//nested 方式
|
|
|
//查出历史数据 然后组合保存
|
|
|
}
|
|
|
}else {
|
|
|
|
|
|
}
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDict())){
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
String param[] = {cubeMemberMappingModel.getDict(),cloumnCode};
|
|
|
String value = dictFunc.execute(param);
|
|
|
String dictCode = cloumnCode + ".Code";
|
|
|
String dictName = cloumnCode + ".Name";
|
|
|
source.put(dictCode,key);
|
|
|
source.put(dictName,value);
|
|
|
}else if(StringUtils.isNotEmpty(cubeMemberMappingModel.getAlgorithm())){
|
|
|
if(cubeMemberMappingModel.getAlgorithm().equals("DivisionFunc") && StringUtils.isNotEmpty(cubeMemberMappingModel.getParm())){
|
|
|
if(cubeMemberMappingModel.getParm().equals(key)){
|
|
|
DivisionFunc divisionFunc = new DivisionFunc();
|
|
|
String townParam[] = {dataMap.get(key).toString(),"1"};
|
|
|
String townVal = divisionFunc.execute(townParam);
|
|
|
String cityParam[] = {dataMap.get(key).toString(),"2"};
|
|
|
String cityVal = divisionFunc.execute(cityParam);
|
|
|
String provinceParam[] = {dataMap.get(key).toString(),"3"};
|
|
|
String provinceVal = divisionFunc.execute(provinceParam);
|
|
|
source.put("",townVal);
|
|
|
source.put("",cityVal);
|
|
|
source.put("",provinceVal);
|
|
|
String field = parentCode + ".subRowkey";
|
|
|
List<Map<String, Object>> subDataList = elasticSearchUtil.findByField(index, type, field, rowKey);
|
|
|
if(subDataList != null && subDataList.size() > 0){
|
|
|
String parentRowkey = subDataList.get(0).get(rowKey_k).toString();
|
|
|
List<Map<String, Object>> dataList = elasticSearchUtil.findByField(index, type, rowKey_k, parentRowkey);
|
|
|
if(dataList != null && dataList.size() > 0){
|
|
|
//组装 子集历史数据 在添加
|
|
|
}else {
|
|
|
//单条添加
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}else {
|
|
|
if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDataType())){
|
|
|
String dataType = cubeMemberMappingModel.getDataType();
|
|
|
Object value = dataConver(dataType,keyValue);
|
|
|
source.put(cloumnCode,value);
|
|
|
}else{
|
|
|
source.put(cloumnCode,dataMap.get(key));
|
|
|
//字典扩展
|
|
|
if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDict())){
|
|
|
DictFunc dictFunc = new DictFunc();
|
|
|
String param[] = {cubeMemberMappingModel.getDict(),cloumnCode};
|
|
|
String value = dictFunc.execute(param);
|
|
|
String dictCode = cloumnCode + ".Code";
|
|
|
String dictName = cloumnCode + ".Name";
|
|
|
source.put(dictCode,key);
|
|
|
source.put(dictName,value);
|
|
|
}else if(StringUtils.isNotEmpty(cubeMemberMappingModel.getAlgorithm())){
|
|
|
if(cubeMemberMappingModel.getAlgorithm().equals("DivisionFunc") && StringUtils.isNotEmpty(cubeMemberMappingModel.getParm())){
|
|
|
if(cubeMemberMappingModel.getParm().equals(key)){
|
|
|
DivisionFunc divisionFunc = new DivisionFunc();
|
|
|
String townParam[] = {dataMap.get(key).toString(),"1"};
|
|
|
String townVal = divisionFunc.execute(townParam);
|
|
|
String cityParam[] = {dataMap.get(key).toString(),"2"};
|
|
|
String cityVal = divisionFunc.execute(cityParam);
|
|
|
String provinceParam[] = {dataMap.get(key).toString(),"3"};
|
|
|
String provinceVal = divisionFunc.execute(provinceParam);
|
|
|
source.put("",townVal);
|
|
|
source.put("",cityVal);
|
|
|
source.put("",provinceVal);
|
|
|
}
|
|
|
}
|
|
|
}else {
|
|
|
if(StringUtils.isNotEmpty(cubeMemberMappingModel.getDataType())){
|
|
|
String dataType = cubeMemberMappingModel.getDataType();
|
|
|
Object value = dataConver(dataType,keyValue);
|
|
|
source.put(cloumnCode,value);
|
|
|
}else{
|
|
|
source.put(cloumnCode,dataMap.get(key));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@ -201,8 +231,8 @@ public class ElasticSearchDataProcessService {
|
|
|
*/
|
|
|
public Map<String, Object> mysqlDataProcess(Map<String, Object> dataMap){
|
|
|
Map<String, Object> source = new HashMap<>();
|
|
|
String table = dataMap.get("table").toString();
|
|
|
String rowKey = dataMap.get("rowKey").toString();
|
|
|
String table = dataMap.get(table_k).toString();
|
|
|
String rowKey = dataMap.get(rowKey_k).toString();
|
|
|
|
|
|
|
|
|
//处理 数据库执行动作 增删改
|