123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- package com.yihu.quota.etl.extract.solr;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.yihu.ehr.elasticsearch.ElasticSearchUtil;
- import com.yihu.ehr.query.common.model.SolrGroupEntity;
- import com.yihu.ehr.query.services.SolrQuery;
- import com.yihu.ehr.util.datetime.DateUtil;
- import com.yihu.quota.dao.save.TjQuotaDataSaveDao;
- import com.yihu.quota.etl.Contant;
- import com.yihu.quota.etl.ExtractConverUtil;
- import com.yihu.quota.etl.extract.ExtractUtil;
- import com.yihu.quota.etl.model.EsConfig;
- import com.yihu.quota.model.dimension.TjQuotaDimensionMain;
- import com.yihu.quota.model.dimension.TjQuotaDimensionSlave;
- import com.yihu.quota.model.save.TjQuotaDataSave;
- import com.yihu.quota.vo.FilterModel;
- import com.yihu.quota.vo.QuotaVo;
- import com.yihu.quota.vo.SaveModel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StringUtils;
- import java.io.IOException;
- import java.util.*;
- /**
- * 对solr抽取数据,基于指标配置维度(不能包括按周、月、年等时间维度) +
- * 默认按天维度统计值作为最小单位分组聚合,
- * 保存聚合结果到ES。二次统计基于以上的聚合结果进行统计。
- * <p>
- * Created by janseny on 2017/7/10.
- */
- @Deprecated
- @Component
- @Scope("prototype")
- public class SolrExtract {
- private Logger logger = LoggerFactory.getLogger(SolrExtract.class);
- @Autowired
- private ExtractUtil extractUtil;
- @Autowired
- private SolrQuery solrQuery;
- @Autowired
- private ExtractConverUtil extractConverUtil;
- @Autowired
- private ElasticSearchUtil esUtil;
- @Autowired
- private TjQuotaDataSaveDao tjQuotaDataSaveDao;
- @Autowired
- private ObjectMapper objectMapper;
- private QuotaVo quotaVo;
- private String startTime;
- private String endTime;
- private String timeLevel;
- private EsConfig esConfig;
- public List<SaveModel> extract(List<TjQuotaDimensionMain> qdm,//主维度
- List<TjQuotaDimensionSlave> qds,//细维度
- String startTime,//开始时间
- String endTime, //结束时间
- String timeLevel, // 时间维度,默认且只按天统计
- QuotaVo quotaVo,//指标配置
- EsConfig esConfig //es配置
- ) throws Exception {
- this.startTime = startTime;
- this.endTime = endTime;
- this.timeLevel = timeLevel;
- this.quotaVo = quotaVo;
- this.esConfig = esConfig;
- solrQuery.initParams(this.startTime, this.endTime);
- // 统计数据
- return statiscSlor(qdm, qds, quotaVo);
- }
- public int getExtractTotal(String startTime, String endTime, EsConfig esConfig) throws Exception {
- this.startTime = startTime;
- this.endTime = endTime;
- this.esConfig = esConfig;
- solrQuery.initParams(this.startTime, this.endTime);
- String core = esConfig.getTable(); // solr的core名
- String q = null; // 过滤条件
- // 统计数据数量
- String timeKey = esConfig.getTimekey();
- if (!StringUtils.isEmpty(timeKey)) {
- if (!StringUtils.isEmpty(startTime) && !StringUtils.isEmpty(endTime)) {
- q = String.format("%s:[%s TO %s]", timeKey, startTime, endTime);
- } else {
- q = timeKey + ":[* TO *]";
- }
- }
- long rows = solrQuery.count(core, q, esConfig.getFilter());
- return Integer.valueOf(String.valueOf(rows));
- }
- public List<SaveModel> statiscSlor(List<TjQuotaDimensionMain> qdm,
- List<TjQuotaDimensionSlave> qds,
- QuotaVo quotaVo) throws Exception {
- List<SaveModel> returnList = new ArrayList<>();
- String core = esConfig.getTable(); // solr的core名
- String q = null; // 查询条件
- String fq = null; // 过滤条件
- if (esConfig.getFilter() != null) {
- fq = esConfig.getFilter();
- }
- String fl = ""; // 结果指定查询字段
- List<SolrGroupEntity> dimensionGroupList = new ArrayList<>(); // 维度分组统计条件
- if (StringUtils.isEmpty(esConfig.getTimekey())) {
- throw new Exception("数据源配置 timeKey 不能为空!");
- }
- String timeKey = esConfig.getTimekey();
- Map<String, String> mainMap = new HashMap<>();
- Map<String, String> slaveMap = new HashMap<>();
- for (int i = 0; i < qdm.size(); i++) {
- String key = qdm.get(i).getKeyVal();
- mainMap.put(key, key);
- dimensionGroupList.add(new SolrGroupEntity(key, SolrGroupEntity.GroupType.FIELD_VALUE));
- fl += key + ",";
- if (qdm.get(i).getMainCode().equals("org")) {
- String orgFilter = " AND org_code:*";
- if (!StringUtils.isEmpty(fq)) {
- fq += orgFilter;
- } else {
- fq = orgFilter;
- }
- }
- if (qdm.get(i).getMainCode().equals("town")) {
- String townFilter = " AND org_area:*";
- if (!StringUtils.isEmpty(fq)) {
- fq += townFilter;
- } else {
- fq = townFilter;
- }
- }
- }
- for (int i = 0; i < qds.size(); i++) {
- String key = qds.get(i).getKeyVal();
- slaveMap.put(key, key);
- dimensionGroupList.add(new SolrGroupEntity(key, SolrGroupEntity.GroupType.FIELD_VALUE));
- fl += key + ",";
- }
- fl += timeKey + ",rowkey";
- if (StringUtils.isEmpty(esConfig.getAggregation()) || (!esConfig.getAggregation().equals(Contant.quota.aggregation_list) && !esConfig.getAggregation().equals(Contant.quota.aggregation_distinct))) {
- // 默认追加一个日期字段作为细维度,方便按天统计作为最小单位统计值。
- slaveMap.put(timeKey, timeKey);
- TjQuotaDimensionSlave daySlave = new TjQuotaDimensionSlave();
- daySlave.setSlaveCode(timeKey);
- daySlave.setKeyVal(timeKey);
- qds.add(daySlave);
- dimensionGroupList.add(new SolrGroupEntity(timeKey, SolrGroupEntity.GroupType.DATE_RANGE, "+1DAY"));
- }
- // 拼接增量或全量的筛选条件
- if (!StringUtils.isEmpty(timeKey)) {
- if (!StringUtils.isEmpty(startTime) && !StringUtils.isEmpty(endTime)) {
- q = String.format("%s:[%s TO %s]", timeKey, startTime, endTime);
- } else {
- q = timeKey + ":[* TO *]";
- }
- }
- boolean listFlag = false;
- // 最后一个维度基于其他维度组合作为条件的统计结果的集合
- List<Map<String, Object>> list = new ArrayList<>();
- if (StringUtils.isEmpty(esConfig.getAggregation())
- || Contant.quota.aggregation_count.equals(esConfig.getAggregation())) {
- // count 聚合
- list = solrQuery.getCountMultList(core, q, fq, dimensionGroupList, null);
- } else if (!StringUtils.isEmpty(esConfig.getAggregationKey()) && Contant.quota.aggregation_sum.equals(esConfig.getAggregation())) {
- // sum 聚合
- list = solrQuery.getSumMultList(core, q, fq, esConfig.getAggregationKey(), dimensionGroupList, null);
- } else if (Contant.quota.aggregation_list.equals(esConfig.getAggregation())) {
- listFlag = true;
- // 查询列表
- try {
- if (esConfig.getAggregation().equals(Contant.quota.aggregation_list) && !StringUtils.isEmpty(esConfig.getAggregationKey())) {
- fl = fl + "," + esConfig.getAggregationKey();
- }
- logger.warn("solr 从" + quotaVo.getStart() + " 开始获取数据,这次准备获取" + quotaVo.getRows() + "条");
- list = solrQuery.queryReturnFieldList(core, q, fq, null, quotaVo.getStart(), quotaVo.getRows(), fl.split(","));
- } catch (Exception e) {
- throw new Exception("solr 查询异常 " + e.getMessage());
- }
- } else if (Contant.quota.aggregation_distinct.equals(esConfig.getAggregation())) {
- listFlag = true;
- // 去重查询
- fl += "," + esConfig.getDistinctGroupField();
- boolean groupNullIsolate = esConfig.getDistinctGroupNullIsolate() != null ? esConfig.getDistinctGroupNullIsolate() : false;
- int count = (int) solrQuery.count(core, q, fq);
- list = solrQuery.distinctQueryReturnFieldList(core, q, fq, null, 0, count, fl.split(","),
- esConfig.getDistinctGroupField(), esConfig.getDistinctGroupSort(), groupNullIsolate);
- // 对比ES中如果已存在该条数据则更新,并从集合中移除该条数据。
- checkEsDistinctData(list);
- }
- //数据转换
- FilterModel filterModel = new FilterModel(list, null);
- filterModel = extractConverUtil.convert(filterModel, qds);
- if (filterModel != null && filterModel.getDataList() != null) {
- list = filterModel.getDataList();
- }
- Map<String, String> statisticsResultMap = new LinkedHashMap<>(); // 统计结果集
- Map<String, String> daySlaveDictMap = new LinkedHashMap<>(); // 按天统计的所有日期项
- if (listFlag) {
- if (list != null && list.size() > 0) {
- returnList = extractUtil.computeList(qdm, qds, list, timeKey, esConfig.getAggregationKey(), quotaVo);
- }
- } else {
- if (list != null && list.size() > 0) {
- for (Map<String, Object> objectMap : list) {
- String statisticsKey = objectMap.get("$statisticsKey").toString();
- String result = objectMap.get("$result").toString();
- String quotaDate = objectMap.get(timeKey).toString();
- statisticsResultMap.put(statisticsKey, result);
- daySlaveDictMap.put(statisticsKey, quotaDate);
- }
- }
- // 融合主细维度、其组合统计值为SaveModel
- extractUtil.compute(qdm, qds, returnList, statisticsResultMap, daySlaveDictMap, quotaVo);
- }
- return returnList;
- }
- /**
- * 去重查询后,对比 ES 中如果已存在该条数据,且去重集合中有更前的则更新quotaDate,并从去重集合中移除该条数据。
- *
- * @param distinctQueryList 去重查询结果集
- */
- private void checkEsDistinctData(List<Map<String, Object>> distinctQueryList) throws IOException {
- List<Map<String, Object>> updateList = new ArrayList<>();
- String esIndex = null;
- String esType = null;
- if (distinctQueryList.size() > 0) {
- TjQuotaDataSave quotaDataSave = tjQuotaDataSaveDao.findByQuotaCode(quotaVo.getCode());
- Map<String, Object> configJson = objectMapper.readValue(quotaDataSave.getConfigJson(), Map.class);
- esIndex = configJson.get("index").toString();
- esType = configJson.get("type").toString();
- }
- Iterator<Map<String, Object>> iterator = distinctQueryList.iterator();
- while (iterator.hasNext()) {
- Map<String, Object> item = iterator.next();
- String timeKeyValue = DateUtil.formatDate((Date) item.get(esConfig.getTimekey()), DateUtil.DEFAULT_DATE_YMD_FORMAT);
- String distinctField = item.get("distinctField").toString();
- String distinctFieldValue = item.get("distinctFieldValue").toString();
- String filters = "quotaCode=" + quotaVo.getCode().replace("_", "") + ";distinctField=" + distinctField + ";distinctFieldValue=" + distinctFieldValue;
- List<Map<String, Object>> existedList = esUtil.list(esIndex, esType, filters);
- if (existedList.size() > 0) {
- Map<String, Object> quotaData = existedList.get(0);
- String quotaDate = quotaData.get("quotaDate").toString();
- if (DateUtil.compareDate(DateUtil.DEFAULT_DATE_YMD_FORMAT, timeKeyValue, quotaDate) < 0) {
- quotaData.put("quotaDate", timeKeyValue);
- updateList.add(quotaData);
- iterator.remove();
- }
- }
- }
- if (updateList.size() != 0) {
- esUtil.bulkUpdate(esIndex, esType, updateList);
- }
- }
- }
|