|  | @ -3,9 +3,17 @@ package com.yihu.iot.service.analyzer;
 | 
	
		
			
				|  |  | import com.alibaba.fastjson.JSONArray;
 | 
	
		
			
				|  |  | import com.alibaba.fastjson.JSONObject;
 | 
	
		
			
				|  |  | import com.fasterxml.jackson.databind.ObjectMapper;
 | 
	
		
			
				|  |  | import com.yihu.elasticsearch.ElasticSearchHelper;
 | 
	
		
			
				|  |  | import com.yihu.iot.datainput.service.DataInputService;
 | 
	
		
			
				|  |  | import com.yihu.iot.datainput.service.DataProcessLogService;
 | 
	
		
			
				|  |  | import com.yihu.iot.datainput.util.ConstantUtils;
 | 
	
		
			
				|  |  | import com.yihu.iot.datainput.util.RowKeyUtils;
 | 
	
		
			
				|  |  | import com.yihu.iot.service.common.MyJdbcTemplate;
 | 
	
		
			
				|  |  | import com.yihu.jw.datainput.DataBodySignsDO;
 | 
	
		
			
				|  |  | import com.yihu.jw.util.date.DateUtil;
 | 
	
		
			
				|  |  | import com.yihu.mysql.query.BaseJpaService;
 | 
	
		
			
				|  |  | import org.apache.commons.lang.StringUtils;
 | 
	
		
			
				|  |  | import org.apache.http.client.utils.DateUtils;
 | 
	
		
			
				|  |  | import org.slf4j.Logger;
 | 
	
		
			
				|  |  | import org.slf4j.LoggerFactory;
 | 
	
		
			
				|  |  | import org.springframework.beans.factory.annotation.Autowired;
 | 
	
	
		
			
				|  | @ -13,10 +21,8 @@ import org.springframework.jdbc.core.BeanPropertyRowMapper;
 | 
	
		
			
				|  |  | import org.springframework.jdbc.core.JdbcTemplate;
 | 
	
		
			
				|  |  | import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | import java.util.HashMap;
 | 
	
		
			
				|  |  | import java.util.LinkedHashMap;
 | 
	
		
			
				|  |  | import java.util.List;
 | 
	
		
			
				|  |  | import java.util.Map;
 | 
	
		
			
				|  |  | import java.io.IOException;
 | 
	
		
			
				|  |  | import java.util.*;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | /**
 | 
	
		
			
				|  |  |  * @author cws on 2019/6/16
 | 
	
	
		
			
				|  | @ -39,16 +45,161 @@ public class IotAnalyzerService extends BaseJpaService<WlyyIotD, WlyyIotDDao> {
 | 
	
		
			
				|  |  |     private MyJdbcTemplate myJdbcTemplate;
 | 
	
		
			
				|  |  |     @Autowired
 | 
	
		
			
				|  |  |     private JdbcTemplate jdbcTemplate;
 | 
	
		
			
				|  |  |     @Autowired
 | 
	
		
			
				|  |  |     private DataProcessLogService dataProcessLogService;
 | 
	
		
			
				|  |  |     @Autowired
 | 
	
		
			
				|  |  |     private ElasticSearchHelper elasticSearchHelper;
 | 
	
		
			
				|  |  |     @Autowired
 | 
	
		
			
				|  |  |     private DeviceHealthyInfoMappingDao deviceHealthyInfoMappingDao;
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     public void initIHealthIntoEs(){
 | 
	
		
			
				|  |  |         try {
 | 
	
		
			
				|  |  |             Integer id = 0;
 | 
	
		
			
				|  |  |             String sql = "SELECT i.id,i.user,i.value1,i.value2,i.value3,i.value4,i.type,i.record_date,i.device_sn,i.status,i.del,d.device_name,d.user_type,p.name,p.idcard " +
 | 
	
		
			
				|  |  |                     "from device.wlyy_patient_health_index i " +
 | 
	
		
			
				|  |  |                     "LEFT JOIN wlyy.wlyy_patient_device d on i.`user` = d.`user` and i.device_sn = d.device_sn " +
 | 
	
		
			
				|  |  |                     "LEFT JOIN wlyy.wlyy_patient p on i.`user` = p.code where i.id>? order by i.id limit 10000";
 | 
	
		
			
				|  |  |             List<JSONObject> list = myJdbcTemplate.queryJson(sql,new Object[]{id});
 | 
	
		
			
				|  |  |             while (list.size()>0){
 | 
	
		
			
				|  |  |                 List<String> saveList = new ArrayList<>();
 | 
	
		
			
				|  |  |                 List<DeviceHealthyInfoMapping> infos = new ArrayList<>();
 | 
	
		
			
				|  |  |                 for (int i=0;i<list.size();i++){
 | 
	
		
			
				|  |  |                     JSONObject one = list.get(i);
 | 
	
		
			
				|  |  |                     JSONObject tmp = inputBodySignsData(initUpload(one));
 | 
	
		
			
				|  |  |                     if(tmp==null){
 | 
	
		
			
				|  |  |                         logger.error("id:"+one.getInteger("id"));
 | 
	
		
			
				|  |  |                     }else {
 | 
	
		
			
				|  |  |                         saveList.add(tmp.toJSONString());
 | 
	
		
			
				|  |  |                         //保存映射关系
 | 
	
		
			
				|  |  |                         DeviceHealthyInfoMapping info = new DeviceHealthyInfoMapping();
 | 
	
		
			
				|  |  |                         info.setCreateTime(new Date());
 | 
	
		
			
				|  |  |                         info.setIndexId(one.getLong("id"));
 | 
	
		
			
				|  |  |                         info.setRid(tmp.getJSONArray("data").getJSONObject(0).getString("rid"));
 | 
	
		
			
				|  |  |                         infos.add(info);
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 //将数据存入es
 | 
	
		
			
				|  |  |                 boolean success = false;
 | 
	
		
			
				|  |  |                 try {
 | 
	
		
			
				|  |  |                     success = elasticSearchHelper.save(ConstantUtils.esIndex, ConstantUtils.esType, saveList);
 | 
	
		
			
				|  |  |                 }catch (Exception e){
 | 
	
		
			
				|  |  |                     e.printStackTrace();
 | 
	
		
			
				|  |  |                     logger.error("upload signBodyData to elasticsearch failed," + e.getMessage());
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 if(success){
 | 
	
		
			
				|  |  |                     logger.info("上传成功:"+list.size());
 | 
	
		
			
				|  |  |                 }else{
 | 
	
		
			
				|  |  |                     logger.info("初始化把i健康体征数据存入es,导入失败");
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 deviceHealthyInfoMappingDao.save(infos);
 | 
	
		
			
				|  |  |                 //下一次
 | 
	
		
			
				|  |  |                 id = list.get(list.size()-1).getInteger("id");
 | 
	
		
			
				|  |  |                 list = myJdbcTemplate.queryJson(sql,new Object[]{id});
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         }catch (Exception e){
 | 
	
		
			
				|  |  |             e.printStackTrace();
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 初始化上传
 | 
	
		
			
				|  |  |      * @param jsonObject
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     public String initUpload(JSONObject jsonObject){
 | 
	
		
			
				|  |  |         JSONObject json = new JSONObject();
 | 
	
		
			
				|  |  |         Integer type = jsonObject.getInteger("type");
 | 
	
		
			
				|  |  |         json.put("access_token","iHealth");
 | 
	
		
			
				|  |  |         json.put("data_source","iHealth");
 | 
	
		
			
				|  |  |         json.put("sn",StringUtils.trimToEmpty(jsonObject.getString("device_sn")));
 | 
	
		
			
				|  |  |         json.put("ext_code",StringUtils.trimToEmpty(jsonObject.getString("user_type")));
 | 
	
		
			
				|  |  |         if(StringUtils.isNotEmpty(jsonObject.getString("device_name"))){
 | 
	
		
			
				|  |  |             json.put("device_name",StringUtils.trimToEmpty(jsonObject.getString("device_name")));
 | 
	
		
			
				|  |  |             json.put("device_model",StringUtils.trimToEmpty(jsonObject.getString("device_name")));
 | 
	
		
			
				|  |  |         }else{
 | 
	
		
			
				|  |  |             //历史数据未绑定居民
 | 
	
		
			
				|  |  |             String sql = "SELECT i.device_code,i.device_model,i.device_name from device.wlyy_devices i WHERE device_code = '"+jsonObject.getString("device_sn")+"'";
 | 
	
		
			
				|  |  |             List<Map<String,Object>> list = jdbcTemplate.queryForList(sql);
 | 
	
		
			
				|  |  |             if(list.size()>0){
 | 
	
		
			
				|  |  |                 json.put("device_name",StringUtils.trimToEmpty(list.get(0).get("device_name").toString()));
 | 
	
		
			
				|  |  |                 json.put("device_model",StringUtils.trimToEmpty(list.get(0).get("device_model").toString()));
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         json.put("idcard",StringUtils.trimToEmpty(jsonObject.getString("idcard")));
 | 
	
		
			
				|  |  |         json.put("username",StringUtils.trimToEmpty(jsonObject.getString("name")));
 | 
	
		
			
				|  |  |         json.put("usercode",StringUtils.trimToEmpty(jsonObject.getString("user")));
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         JSONArray jsonArray = new JSONArray();
 | 
	
		
			
				|  |  |         JSONObject js = new JSONObject();
 | 
	
		
			
				|  |  |         js.put("measure_time",subStringTime(jsonObject.getString("record_date")));
 | 
	
		
			
				|  |  |         js.put("del",StringUtils.trimToEmpty(jsonObject.getString("del")));
 | 
	
		
			
				|  |  |         js.put("status",getStatus(jsonObject.getString("status")));
 | 
	
		
			
				|  |  |         js.put("type",String.valueOf(type));
 | 
	
		
			
				|  |  |         switch (type){
 | 
	
		
			
				|  |  |             case 1:
 | 
	
		
			
				|  |  |                 //血糖
 | 
	
		
			
				|  |  |                 js.put("value1",jsonObject.getString("value1"));
 | 
	
		
			
				|  |  |                 js.put("value2",jsonObject.getString("value2"));
 | 
	
		
			
				|  |  |                 break;
 | 
	
		
			
				|  |  |             case 2:
 | 
	
		
			
				|  |  |                 //血压
 | 
	
		
			
				|  |  |                 js.put("value1",jsonObject.getString("value1"));
 | 
	
		
			
				|  |  |                 js.put("value2",jsonObject.getString("value2"));
 | 
	
		
			
				|  |  |                 if(StringUtils.isNotBlank(jsonObject.getString("value3"))){
 | 
	
		
			
				|  |  |                     js.put("value3",jsonObject.getString("value3"));
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 break;
 | 
	
		
			
				|  |  |             case 3:
 | 
	
		
			
				|  |  |                 //体重/身高/BMI
 | 
	
		
			
				|  |  |                 js.put("value1",jsonObject.getString("value1"));
 | 
	
		
			
				|  |  |                 js.put("value2",jsonObject.getString("value2"));
 | 
	
		
			
				|  |  |                 if(StringUtils.isNotBlank(jsonObject.getString("value3"))){
 | 
	
		
			
				|  |  |                     js.put("value3",jsonObject.getString("value3"));
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 break;
 | 
	
		
			
				|  |  |             case 4:
 | 
	
		
			
				|  |  |                 //腰围
 | 
	
		
			
				|  |  |                 js.put("value1",jsonObject.getString("value1"));
 | 
	
		
			
				|  |  |                 break;
 | 
	
		
			
				|  |  |             default:
 | 
	
		
			
				|  |  |                 break;
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         jsonArray.add(js);
 | 
	
		
			
				|  |  |         json.put("data",jsonArray);
 | 
	
		
			
				|  |  |         //上传
 | 
	
		
			
				|  |  |         return json.toJSONString();
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 时间处理
 | 
	
		
			
				|  |  |      * @param time
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String subStringTime(String time){
 | 
	
		
			
				|  |  |         return StringUtils.isBlank(time)? "":time.substring(0,19);
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 状态默认值
 | 
	
		
			
				|  |  |      * @param status
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     private String getStatus(String status){
 | 
	
		
			
				|  |  |         return StringUtils.isBlank(status)?"0":status;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 初始化把奕拓小屋数据存入es
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     public void initIntoEs(){
 | 
	
		
			
				|  |  |         String id = "";
 | 
	
		
			
				|  |  |         String sqlM = "SELECT * FROM wlyy_iot_m WHERE id >? and create_time is not null ORDER BY id limit 2";
 | 
	
		
			
				|  |  |         String sqlM = "SELECT * FROM wlyy_iot_m WHERE id >? and create_time is not null ORDER BY id limit 1000";
 | 
	
		
			
				|  |  |         String sqlD = "SELECT * from wlyy_iot_d WHERE mid = ?";
 | 
	
		
			
				|  |  |         List<WlyyIotM> list = jdbcTemplate.query(sqlM, new Object[] {id}, new BeanPropertyRowMapper<>(WlyyIotM.class));
 | 
	
		
			
				|  |  |         while (list.size()>0){
 | 
	
		
			
				|  |  |             List<String> saveList = new ArrayList<>();
 | 
	
		
			
				|  |  |             list.forEach(wlyyIotM->{
 | 
	
		
			
				|  |  |                 List<WlyyIotD> wlyyIotDList = jdbcTemplate.query(sqlD, new Object[] {wlyyIotM.getId()}, new BeanPropertyRowMapper<>(WlyyIotD.class));
 | 
	
		
			
				|  |  |                 JSONObject json = new JSONObject();
 | 
	
	
		
			
				|  | @ -72,10 +223,10 @@ public class IotAnalyzerService extends BaseJpaService<WlyyIotD, WlyyIotDDao> {
 | 
	
		
			
				|  |  |                 for(WlyyIotD iotD:wlyyIotDList){
 | 
	
		
			
				|  |  |                     String value =  "EcgData".equals(iotD.getCode())?iotD.getContent():iotD.getValue();
 | 
	
		
			
				|  |  |                     if(map2.containsKey(iotD.getType())){
 | 
	
		
			
				|  |  |                         map2.get(iotD.getType()).put(iotD.getCode(),iotD.getValue());
 | 
	
		
			
				|  |  |                         map2.get(iotD.getType()).put(iotD.getCode(),value);
 | 
	
		
			
				|  |  |                     }else{
 | 
	
		
			
				|  |  |                         Map<String,Object> map = new HashMap<>();
 | 
	
		
			
				|  |  |                         map.put(iotD.getCode(),iotD.getValue());
 | 
	
		
			
				|  |  |                         map.put(iotD.getCode(),value);
 | 
	
		
			
				|  |  |                         map2.put(iotD.getType(),map);
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
	
		
			
				|  | @ -93,17 +244,82 @@ public class IotAnalyzerService extends BaseJpaService<WlyyIotD, WlyyIotDDao> {
 | 
	
		
			
				|  |  |                     jsonArray.add(js);
 | 
	
		
			
				|  |  |                     json.put("data",jsonArray);
 | 
	
		
			
				|  |  |                     try {
 | 
	
		
			
				|  |  |                         dataInputService.inputBodySignsData(json.toJSONString());
 | 
	
		
			
				|  |  |                         JSONObject tmp = inputBodySignsData(json.toJSONString());
 | 
	
		
			
				|  |  |                         if(tmp!=null){
 | 
	
		
			
				|  |  |                             saveList.add(tmp.toJSONString());
 | 
	
		
			
				|  |  |                         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |                     }catch (Exception e){
 | 
	
		
			
				|  |  |                         e.printStackTrace();
 | 
	
		
			
				|  |  |                     }
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |             });
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |             //将数据存入es
 | 
	
		
			
				|  |  |             boolean success = false;
 | 
	
		
			
				|  |  |             try {
 | 
	
		
			
				|  |  |                 success = elasticSearchHelper.save(ConstantUtils.esIndex, ConstantUtils.esType, saveList);
 | 
	
		
			
				|  |  |             }catch (Exception e){
 | 
	
		
			
				|  |  |                 e.printStackTrace();
 | 
	
		
			
				|  |  |                 logger.error("upload signBodyData to elasticsearch failed," + e.getMessage());
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             if(success){
 | 
	
		
			
				|  |  |                 logger.info("上传成功:"+list.size());
 | 
	
		
			
				|  |  |             }else{
 | 
	
		
			
				|  |  |                 logger.info("初始化把i健康体征数据存入es,导入失败");
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |             logger.info("初始化把奕拓小屋数据存入es,导入"+list.size()+"条,初始id:"+id);
 | 
	
		
			
				|  |  |             id = list.get(list.size()-1).getId();
 | 
	
		
			
				|  |  |             list = jdbcTemplate.query(sqlM, new Object[] {id}, new BeanPropertyRowMapper<>(WlyyIotM.class));
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 | 
	
		
			
				|  |  |      * 上传数据
 | 
	
		
			
				|  |  |      * @param json
 | 
	
		
			
				|  |  |      * @return
 | 
	
		
			
				|  |  |      */
 | 
	
		
			
				|  |  |     public JSONObject inputBodySignsData(String json) throws IOException {
 | 
	
		
			
				|  |  |         String rowkey = "";
 | 
	
		
			
				|  |  |         //提取json某些项值
 | 
	
		
			
				|  |  |         DataBodySignsDO dataBodySignsDO = null;
 | 
	
		
			
				|  |  |         try {
 | 
	
		
			
				|  |  |             dataBodySignsDO  = JSONObject.parseObject(json,DataBodySignsDO.class);
 | 
	
		
			
				|  |  |         }catch (Exception e){
 | 
	
		
			
				|  |  |             logger.error("json parse error,invalid json string");
 | 
	
		
			
				|  |  |             return null;
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         JSONObject jsonObject = JSONObject.parseObject(json);
 | 
	
		
			
				|  |  |         String accessToken= dataBodySignsDO.getAccess_token();
 | 
	
		
			
				|  |  |         String dataSource = dataBodySignsDO.getData_source();
 | 
	
		
			
				|  |  |         String deviceSn = dataBodySignsDO.getSn();
 | 
	
		
			
				|  |  |         String extCode = dataBodySignsDO.getExt_code();
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         JSONArray jsonArray = jsonObject.getJSONArray("data");
 | 
	
		
			
				|  |  |         if(null == jsonArray || jsonArray.size() == 0){
 | 
	
		
			
				|  |  |             logger.error("msg","parameter 'data' of json no exist");
 | 
	
		
			
				|  |  |             return null;
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |         //循环数据,一组数据存一行,生成一个rowkey,并将该rowkey存到es中
 | 
	
		
			
				|  |  |         for(Object obj:jsonArray){
 | 
	
		
			
				|  |  |             JSONObject data = (JSONObject)obj;
 | 
	
		
			
				|  |  |             data.put("del","1"); //添加删除标记
 | 
	
		
			
				|  |  |             try {
 | 
	
		
			
				|  |  |                 String measuretime = jsonObject.getString("measure_time");
 | 
	
		
			
				|  |  |                 if(null == measuretime){
 | 
	
		
			
				|  |  |                     measuretime = DateUtils.formatDate(new Date(), DateUtil.yyyy_MM_dd_HH_mm_ss);
 | 
	
		
			
				|  |  |                 }
 | 
	
		
			
				|  |  |                 //生成一份json数据的rowkey
 | 
	
		
			
				|  |  |                 rowkey = RowKeyUtils.makeRowKey(dataSource,deviceSn);
 | 
	
		
			
				|  |  |                 data.put("rid",rowkey);//hbase的rowkey
 | 
	
		
			
				|  |  |             } catch (Exception e) {
 | 
	
		
			
				|  |  |                 logger.error("make rowkey error");
 | 
	
		
			
				|  |  |                 return null;
 | 
	
		
			
				|  |  |             }
 | 
	
		
			
				|  |  |         }
 | 
	
		
			
				|  |  |         return jsonObject;
 | 
	
		
			
				|  |  |     }
 | 
	
		
			
				|  |  | 
 | 
	
		
			
				|  |  |     /**
 |