ElasticsearchUtil.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. package com.yihu.wlyy.statistics.util;
  2. import com.alibaba.druid.sql.ast.SQLExpr;
  3. import com.alibaba.druid.sql.ast.expr.SQLQueryExpr;
  4. import com.alibaba.druid.sql.parser.SQLExprParser;
  5. import com.yihu.wlyy.statistics.etl.save.es.ElasticFactory;
  6. import com.yihu.wlyy.statistics.vo.SaveModel;
  7. import org.elasticsearch.action.search.SearchResponse;
  8. import org.nlpcn.es4sql.domain.Select;
  9. import org.nlpcn.es4sql.jdbc.ObjectResult;
  10. import org.nlpcn.es4sql.jdbc.ObjectResultsExtractor;
  11. import org.nlpcn.es4sql.parse.ElasticSqlExprParser;
  12. import org.nlpcn.es4sql.parse.SqlParser;
  13. import org.nlpcn.es4sql.query.AggregationQueryAction;
  14. import org.nlpcn.es4sql.query.DefaultQueryAction;
  15. import org.nlpcn.es4sql.query.SqlElasticSearchRequestBuilder;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.beans.factory.annotation.Value;
  18. import org.springframework.stereotype.Component;
  19. import org.springframework.util.StringUtils;
  20. import java.util.ArrayList;
  21. import java.util.List;
  22. /**
  23. * Created by chenweida on 2017/7/17.
  24. * SELECT town,townName,sum(result1) result1 FROM wlyy_quota_test
  25. * where quotaCode='1'
  26. * group by town,townName , date_histogram(field='quotaDate','interval'='week')
  27. */
  28. @Component
  29. public class ElasticsearchUtil {
  30. @Autowired
  31. private ElasticFactory elasticFactory;
  32. @Value("${es.type}")
  33. private String esType;
  34. @Value("${es.index}")
  35. private String esIndex;
  36. /**
  37. * 折线图
  38. *
  39. * @param quotaCode 指标quotacode
  40. * @param code 机构code,或者区code或者团队code或者城市code
  41. * @param startDate 开始日期 yyyy-MM-dd
  42. * @param endDate 结束日期 yyyy-MM-dd
  43. * @param timeLevel 1增量 2到达量
  44. * @param areaLevel 1 省 2 市 3 区县 4 机构 5团队
  45. * @param interval 1日 2周 3月
  46. * @return
  47. */
  48. public List<SaveModel> findQuotaLines(String quotaCode,
  49. String code,
  50. String startDate,
  51. String endDate,
  52. String timeLevel,
  53. String areaLevel,
  54. String interval) {
  55. //时间格式转换 yyyy-MM-dd转成 2017-07-17T00:00:00+0800
  56. startDate = changeDate(startDate);
  57. //时间格式转换 yyyy-MM-dd转成 2017-07-17T00:00:00+0800
  58. endDate = changeDate(endDate);
  59. StringBuffer sql = new StringBuffer();
  60. StringBuffer groupBy = new StringBuffer();
  61. if (SaveModel.teamLevel.equals(areaLevel)) {
  62. sql.append("select team,teamName,result1,result2 from wlyy_quota_test where team='" + code + "'");
  63. } else if (SaveModel.OrgLevel.equals(areaLevel)) {
  64. sql.append("select hospital,hospitalName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where hospital='" + code + "'");
  65. groupBy.append(" group by hospital,hospitalName");
  66. } else if (SaveModel.townLevel.equals(areaLevel)) {
  67. sql.append("select town,townName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where town='" + code + "'");
  68. groupBy.append(" group by town,townName");
  69. } else if (SaveModel.cityLevel.equals(areaLevel)) {
  70. sql.append("select city,cityName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where city='" + code + "'");
  71. groupBy.append(" group by city,cityName");
  72. }
  73. sql.append(" and quotaCode='" + quotaCode + "' ");
  74. sql.append(" and timeLevel='" + timeLevel + "' ");
  75. sql.append(" and areaLevel='5'");
  76. sql.append(" and quotaDate >='" + startDate + "' ");
  77. sql.append(" and quotaDate <='" + endDate + "' ");
  78. //根据时间维度分组
  79. if (SaveModel.interval_month.equals(interval)) {
  80. groupBy.append(" ,date_histogram(field='quotaDate','interval'='month') ");
  81. } else if (SaveModel.interval_week.equals(interval)) {
  82. groupBy.append(" ,date_histogram(field='quotaDate','interval'='week') ");
  83. } else if (SaveModel.interval_day.equals(interval)) {
  84. groupBy.append(" ,date_histogram(field='quotaDate','interval'='1d') ");
  85. }
  86. sql.append(groupBy);
  87. return excute(sql.toString());
  88. }
  89. /**
  90. * 查询某个指标某一天的量
  91. *
  92. * @param quotaCode 指标quotacode
  93. * @param quotaDate 时间 yyyy-MM-dd
  94. * @param timeLevel 1增量 2到达量
  95. * @param areaLevel 1 省 2 市 3 区县 4 机构 5团队
  96. * @return
  97. */
  98. public List<SaveModel> findOneDateQuota(String quotaCode,
  99. String code,
  100. String quotaDate,
  101. String timeLevel,
  102. String areaLevel) {
  103. //时间格式转换 yyyy-MM-dd转成 2017-07-17T00:00:00+0800
  104. quotaDate = changeDate(quotaDate);
  105. StringBuffer sql = new StringBuffer();
  106. StringBuffer groupBy = new StringBuffer();
  107. if (SaveModel.teamLevel.equals(areaLevel)) {
  108. sql.append("select team,teamName,result1,result2 from wlyy_quota_test where team='" + code + "'");
  109. } else if (SaveModel.OrgLevel.equals(areaLevel)) {
  110. sql.append("select hospital,hospitalName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where hospital='" + code + "'");
  111. groupBy.append(" group by hospital,hospitalName");
  112. } else if (SaveModel.townLevel.equals(areaLevel)) {
  113. sql.append("select town,townName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where town='" + code + "'");
  114. groupBy.append(" group by town,townName");
  115. } else if (SaveModel.cityLevel.equals(areaLevel)) {
  116. sql.append("select city,cityName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where city='" + code + "'");
  117. groupBy.append(" group by city,cityName");
  118. }
  119. sql.append(" and quotaCode='" + quotaCode + "' ");
  120. sql.append(" and timeLevel='" + timeLevel + "' ");
  121. sql.append(" and areaLevel='5'");
  122. sql.append(" and quotaDate='" + quotaDate + "'");
  123. sql.append(groupBy);
  124. return excute(sql.toString());
  125. }
  126. /**
  127. * 查询某个2级维度指标某一天的数据
  128. *
  129. * @param quotaCode 指标quotacode
  130. * @param quotaDate 时间 yyyy-MM-dd
  131. * @param code 机构code或者团队code或者town code或者city code
  132. * @param timeLevel 1增量 2到达量
  133. * @param areaLevel 1 省 2 市 3 区县 4 机构 5团队
  134. * @return
  135. */
  136. public List<SaveModel> findOneDateQuotaLevel2(String quotaCode,
  137. String code,
  138. String quotaDate,
  139. String timeLevel,
  140. String areaLevel) {
  141. //时间格式转换 yyyy-MM-dd转成 2017-07-17T00:00:00+0800
  142. quotaDate = changeDate(quotaDate);
  143. StringBuffer sql = new StringBuffer();
  144. StringBuffer groupBy = new StringBuffer();
  145. if (SaveModel.teamLevel.equals(areaLevel)) {
  146. sql.append("select team,teamName,slaveKey1,slaveKey1Name,slaveKey2,slaveKey2Name,result1,result2 from wlyy_quota_test where team='" + code + "'");
  147. groupBy.append(" group by slaveKey1,slaveKey1Name,slaveKey2,slaveKey2Name");
  148. } else if (SaveModel.OrgLevel.equals(areaLevel)) {
  149. sql.append("select hospital,hospitalName,slaveKey1,slaveKey1Name,slaveKey2,slaveKey2Name,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where hospital='" + code + "'");
  150. groupBy.append(" group by slaveKey1,slaveKey1Name,slaveKey2,slaveKey2Name");
  151. } else if (SaveModel.townLevel.equals(areaLevel)) {
  152. sql.append("select town,townName,slaveKey1,slaveKey1Name,slaveKey2,slaveKey2Name,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where town='" + code + "'");
  153. groupBy.append(" group by slaveKey1,slaveKey1Name,slaveKey2,slaveKey2Name");
  154. } else if (SaveModel.cityLevel.equals(areaLevel)) {
  155. sql.append("select city,cityName,slaveKey1,slaveKey1Name,slaveKey2,slaveKey2Name,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where city='" + code + "'");
  156. groupBy.append(" group by slaveKey1,slaveKey1Name,slaveKey2,slaveKey2Name");
  157. }
  158. sql.append(" and quotaCode='" + quotaCode + "' ");
  159. sql.append(" and timeLevel='" + timeLevel + "' ");
  160. sql.append(" and areaLevel='5'");
  161. sql.append(" and quotaDate='" + quotaDate + "'");
  162. sql.append(groupBy);
  163. return excute(sql.toString());
  164. }
  165. /**
  166. * 查询某个一级维度指标某一天的数据
  167. *
  168. * @param quotaCode 指标quotacode
  169. * @param quotaDate 时间 yyyy-MM-dd
  170. * @param code 机构code或者团队code或者town code或者city code
  171. * @param timeLevel 1增量 2到达量
  172. * @param areaLevel 1 省 2 市 3 区县 4 机构 5团队
  173. * @return
  174. */
  175. public List<SaveModel> findOneDateQuotaLevel1(String quotaCode,
  176. String code,
  177. String quotaDate,
  178. String timeLevel,
  179. String areaLevel) {
  180. //时间格式转换 yyyy-MM-dd转成 2017-07-17T00:00:00+0800
  181. quotaDate = changeDate(quotaDate);
  182. StringBuffer sql = new StringBuffer();
  183. StringBuffer groupBy = new StringBuffer();
  184. if (SaveModel.teamLevel.equals(areaLevel)) {
  185. sql.append("select team,teamName,slaveKey1,slaveKey1Name,result1,result2 from wlyy_quota_test where team='" + code + "'");
  186. groupBy.append(" group by team,teamName,slaveKey1,slaveKey1Name");
  187. } else if (SaveModel.OrgLevel.equals(areaLevel)) {
  188. sql.append("select hospital,hospitalName,slaveKey1,slaveKey1Name,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where hospital='" + code + "'");
  189. groupBy.append(" group by hospital,hospitalName,slaveKey1,slaveKey1Name");
  190. } else if (SaveModel.townLevel.equals(areaLevel)) {
  191. sql.append("select town,townName,slaveKey1,slaveKey1Name,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where town='" + code + "'");
  192. groupBy.append(" group by town,townName,slaveKey1,slaveKey1Name");
  193. } else if (SaveModel.cityLevel.equals(areaLevel)) {
  194. sql.append("select city,cityName,slaveKey1,slaveKey1Name,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where city='" + code + "'");
  195. groupBy.append(" group by city,cityName,slaveKey1,slaveKey1Name");
  196. }
  197. sql.append(" and quotaCode='" + quotaCode + "' ");
  198. sql.append(" and timeLevel='" + timeLevel + "' ");
  199. sql.append(" and areaLevel='5'");
  200. sql.append(" and quotaDate='" + quotaDate + "'");
  201. sql.append(groupBy);
  202. return excute(sql.toString());
  203. }
  204. /**
  205. * 查询某一天父level下的子level 例如 查询市下面的团队,或者区下面的团队
  206. *
  207. * @param quotaCode 指标code
  208. * @param code 机构code或者团队code或者town code或者city code
  209. * @param quotaDate 指标code
  210. * @param timeLevel 1增量 2到达量
  211. * @param areaLevel 父arealevel
  212. * @param childAreaLevel 子arealevel
  213. * @return
  214. */
  215. public List<SaveModel> findOneDateQuotaByChllevel(String quotaCode,
  216. String code,
  217. String quotaDate,
  218. String timeLevel,
  219. String areaLevel,
  220. String childAreaLevel) {
  221. //时间格式转换 yyyy-MM-dd转成 2017-07-17T00:00:00+0800
  222. quotaDate = changeDate(quotaDate);
  223. StringBuffer sql = new StringBuffer();
  224. StringBuffer groupBy = new StringBuffer();
  225. //根据 childAreaLevel group by
  226. if (SaveModel.teamLevel.equals(childAreaLevel)) {
  227. sql.append("select team,teamName,result1,result2 from wlyy_quota_test where '");
  228. } else if (SaveModel.OrgLevel.equals(childAreaLevel)) {
  229. sql.append("select hospital,hospitalName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where ");
  230. groupBy.append(" group by hospital,hospitalName");
  231. } else if (SaveModel.townLevel.equals(childAreaLevel)) {
  232. sql.append("select town,townName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where ");
  233. groupBy.append(" group by town,townName");
  234. } else if (SaveModel.cityLevel.equals(childAreaLevel)) {
  235. sql.append("select city,cityName,sum(result1) result1,sum(result2) result2 from wlyy_quota_test where ");
  236. groupBy.append(" group by city,cityName");
  237. }
  238. sql.append(" quotaCode='" + quotaCode + "' ");
  239. sql.append(" and timeLevel='" + timeLevel + "' ");
  240. sql.append(" and areaLevel='5'");
  241. sql.append(" and quotaDate='" + quotaDate + "'");
  242. //查询code
  243. if (SaveModel.teamLevel.equals(areaLevel)) {
  244. sql.append(" and team='" + code + "'");
  245. } else if (SaveModel.OrgLevel.equals(areaLevel)) {
  246. sql.append(" and hospital='" + code + "'");
  247. } else if (SaveModel.townLevel.equals(areaLevel)) {
  248. sql.append(" and town='" + code + "'");
  249. } else if (SaveModel.cityLevel.equals(areaLevel)) {
  250. sql.append(" and city='" + code + "'");
  251. }
  252. sql.append(groupBy);
  253. return excute(sql.toString());
  254. }
  255. /**
  256. * 时间格式转换 yyyy-MM-dd转成 2017-07-17T00:00:00+0800
  257. *
  258. * @param quotaDate
  259. */
  260. private String changeDate(String quotaDate) {
  261. return quotaDate + "T00:00:00+0800";
  262. }
  263. /**
  264. * 执行sql查询es
  265. *
  266. * @param sql
  267. * @return
  268. */
  269. public List<SaveModel> excute(String sql) {
  270. List<SaveModel> saveModels = new ArrayList<>();
  271. try {
  272. SQLExprParser parser = new ElasticSqlExprParser(sql);
  273. SQLExpr expr = parser.expr();
  274. SQLQueryExpr queryExpr = (SQLQueryExpr) expr;
  275. Select select = null;
  276. select = new SqlParser().parseSelect(queryExpr);
  277. //通过抽象语法树,封装成自定义的Select,包含了select、from、where group、limit等
  278. AggregationQueryAction action = null;
  279. DefaultQueryAction queryAction = null;
  280. SqlElasticSearchRequestBuilder requestBuilder = null;
  281. if (select.isAgg) {
  282. //包含计算的的排序分组的
  283. action = new AggregationQueryAction(elasticFactory.getTransportClient(), select);
  284. requestBuilder = action.explain();
  285. } else {
  286. //封装成自己的Select对象
  287. queryAction = new DefaultQueryAction(elasticFactory.getTransportClient(), select);
  288. requestBuilder = queryAction.explain();
  289. }
  290. SearchResponse response = (SearchResponse) requestBuilder.get();
  291. ObjectResult temp = new ObjectResultsExtractor(true, true, true).extractResults(response.getAggregations(), true);
  292. List<String> heads = temp.getHeaders();
  293. temp.getLines().stream().forEach(one -> {
  294. try {
  295. SaveModel saveModel = new SaveModel();
  296. for (int i = 0; i < one.size(); i++) {
  297. String key = null;
  298. Object value = one.get(i);
  299. if (heads.get(i).contains("date_histogram")) {
  300. key = "setQuotaDate";
  301. value=DateUtil.strToDate(String.valueOf(value),"yyyy-MM-dd HH:mm:ss");
  302. } else {
  303. key = "set" + UpFirstStr(heads.get(i));
  304. }
  305. if (value instanceof String) {
  306. SaveModel.class.getMethod(key, String.class).invoke(saveModel, value);
  307. } else if (value instanceof Integer) {
  308. SaveModel.class.getMethod(key, Integer.class).invoke(saveModel, value);
  309. } else if (value instanceof Double) {
  310. SaveModel.class.getMethod(key, Integer.class).invoke(saveModel, ((Double) value).intValue());
  311. } else if (value instanceof java.util.Date) {
  312. SaveModel.class.getMethod(key, java.util.Date.class).invoke(saveModel, value);
  313. }
  314. }
  315. saveModels.add(saveModel);
  316. } catch (Exception e) {
  317. e.printStackTrace();
  318. }
  319. });
  320. } catch (Exception e) {
  321. e.printStackTrace();
  322. }
  323. return saveModels;
  324. }
  325. /**
  326. * 首字母大写
  327. *
  328. * @param str
  329. * @return
  330. */
  331. private String UpFirstStr(String str) {
  332. return str.replaceFirst(str.substring(0, 1), str.substring(0, 1).toUpperCase());
  333. }
  334. }