|
@ -3,9 +3,17 @@ package com.yihu.iot.service.analyzer;
|
|
import com.alibaba.fastjson.JSONArray;
|
|
import com.alibaba.fastjson.JSONArray;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import com.yihu.elasticsearch.ElasticSearchHelper;
|
|
import com.yihu.iot.datainput.service.DataInputService;
|
|
import com.yihu.iot.datainput.service.DataInputService;
|
|
|
|
import com.yihu.iot.datainput.service.DataProcessLogService;
|
|
|
|
import com.yihu.iot.datainput.util.ConstantUtils;
|
|
|
|
import com.yihu.iot.datainput.util.RowKeyUtils;
|
|
import com.yihu.iot.service.common.MyJdbcTemplate;
|
|
import com.yihu.iot.service.common.MyJdbcTemplate;
|
|
|
|
import com.yihu.jw.datainput.DataBodySignsDO;
|
|
|
|
import com.yihu.jw.util.date.DateUtil;
|
|
import com.yihu.mysql.query.BaseJpaService;
|
|
import com.yihu.mysql.query.BaseJpaService;
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
import org.apache.http.client.utils.DateUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@ -13,10 +21,8 @@ import org.springframework.jdbc.core.BeanPropertyRowMapper;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
|
import java.util.LinkedHashMap;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.util.*;
|
|
|
|
|
|
/**
|
|
/**
|
|
* @author cws on 2019/6/16
|
|
* @author cws on 2019/6/16
|
|
@ -39,16 +45,161 @@ public class IotAnalyzerService extends BaseJpaService<WlyyIotD, WlyyIotDDao> {
|
|
private MyJdbcTemplate myJdbcTemplate;
|
|
private MyJdbcTemplate myJdbcTemplate;
|
|
@Autowired
|
|
@Autowired
|
|
private JdbcTemplate jdbcTemplate;
|
|
private JdbcTemplate jdbcTemplate;
|
|
|
|
@Autowired
|
|
|
|
private DataProcessLogService dataProcessLogService;
|
|
|
|
@Autowired
|
|
|
|
private ElasticSearchHelper elasticSearchHelper;
|
|
|
|
@Autowired
|
|
|
|
private DeviceHealthyInfoMappingDao deviceHealthyInfoMappingDao;
|
|
|
|
|
|
|
|
public void initIHealthIntoEs(){
|
|
|
|
try {
|
|
|
|
Integer id = 0;
|
|
|
|
String sql = "SELECT i.id,i.user,i.value1,i.value2,i.value3,i.value4,i.type,i.record_date,i.device_sn,i.status,i.del,d.device_name,d.user_type,p.name,p.idcard " +
|
|
|
|
"from device.wlyy_patient_health_index i " +
|
|
|
|
"LEFT JOIN wlyy.wlyy_patient_device d on i.`user` = d.`user` and i.device_sn = d.device_sn " +
|
|
|
|
"LEFT JOIN wlyy.wlyy_patient p on i.`user` = p.code where i.id>? order by i.id limit 10000";
|
|
|
|
List<JSONObject> list = myJdbcTemplate.queryJson(sql,new Object[]{id});
|
|
|
|
while (list.size()>0){
|
|
|
|
List<String> saveList = new ArrayList<>();
|
|
|
|
List<DeviceHealthyInfoMapping> infos = new ArrayList<>();
|
|
|
|
for (int i=0;i<list.size();i++){
|
|
|
|
JSONObject one = list.get(i);
|
|
|
|
JSONObject tmp = inputBodySignsData(initUpload(one));
|
|
|
|
if(tmp==null){
|
|
|
|
logger.error("id:"+one.getInteger("id"));
|
|
|
|
}else {
|
|
|
|
saveList.add(tmp.toJSONString());
|
|
|
|
//保存映射关系
|
|
|
|
DeviceHealthyInfoMapping info = new DeviceHealthyInfoMapping();
|
|
|
|
info.setCreateTime(new Date());
|
|
|
|
info.setIndexId(one.getLong("id"));
|
|
|
|
info.setRid(tmp.getJSONArray("data").getJSONObject(0).getString("rid"));
|
|
|
|
infos.add(info);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
//将数据存入es
|
|
|
|
boolean success = false;
|
|
|
|
try {
|
|
|
|
success = elasticSearchHelper.save(ConstantUtils.esIndex, ConstantUtils.esType, saveList);
|
|
|
|
}catch (Exception e){
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error("upload signBodyData to elasticsearch failed," + e.getMessage());
|
|
|
|
}
|
|
|
|
if(success){
|
|
|
|
logger.info("上传成功:"+list.size());
|
|
|
|
}else{
|
|
|
|
logger.info("初始化把i健康体征数据存入es,导入失败");
|
|
|
|
}
|
|
|
|
deviceHealthyInfoMappingDao.save(infos);
|
|
|
|
//下一次
|
|
|
|
id = list.get(list.size()-1).getInteger("id");
|
|
|
|
list = myJdbcTemplate.queryJson(sql,new Object[]{id});
|
|
|
|
}
|
|
|
|
|
|
|
|
}catch (Exception e){
|
|
|
|
e.printStackTrace();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 初始化上传
|
|
|
|
* @param jsonObject
|
|
|
|
* @return
|
|
|
|
*/
|
|
|
|
public String initUpload(JSONObject jsonObject){
|
|
|
|
JSONObject json = new JSONObject();
|
|
|
|
Integer type = jsonObject.getInteger("type");
|
|
|
|
json.put("access_token","iHealth");
|
|
|
|
json.put("data_source","iHealth");
|
|
|
|
json.put("sn",StringUtils.trimToEmpty(jsonObject.getString("device_sn")));
|
|
|
|
json.put("ext_code",StringUtils.trimToEmpty(jsonObject.getString("user_type")));
|
|
|
|
if(StringUtils.isNotEmpty(jsonObject.getString("device_name"))){
|
|
|
|
json.put("device_name",StringUtils.trimToEmpty(jsonObject.getString("device_name")));
|
|
|
|
json.put("device_model",StringUtils.trimToEmpty(jsonObject.getString("device_name")));
|
|
|
|
}else{
|
|
|
|
//历史数据未绑定居民
|
|
|
|
String sql = "SELECT i.device_code,i.device_model,i.device_name from device.wlyy_devices i WHERE device_code = '"+jsonObject.getString("device_sn")+"'";
|
|
|
|
List<Map<String,Object>> list = jdbcTemplate.queryForList(sql);
|
|
|
|
if(list.size()>0){
|
|
|
|
json.put("device_name",StringUtils.trimToEmpty(list.get(0).get("device_name").toString()));
|
|
|
|
json.put("device_model",StringUtils.trimToEmpty(list.get(0).get("device_model").toString()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
json.put("idcard",StringUtils.trimToEmpty(jsonObject.getString("idcard")));
|
|
|
|
json.put("username",StringUtils.trimToEmpty(jsonObject.getString("name")));
|
|
|
|
json.put("usercode",StringUtils.trimToEmpty(jsonObject.getString("user")));
|
|
|
|
|
|
|
|
JSONArray jsonArray = new JSONArray();
|
|
|
|
JSONObject js = new JSONObject();
|
|
|
|
js.put("measure_time",subStringTime(jsonObject.getString("record_date")));
|
|
|
|
js.put("del",StringUtils.trimToEmpty(jsonObject.getString("del")));
|
|
|
|
js.put("status",getStatus(jsonObject.getString("status")));
|
|
|
|
js.put("type",String.valueOf(type));
|
|
|
|
switch (type){
|
|
|
|
case 1:
|
|
|
|
//血糖
|
|
|
|
js.put("value1",jsonObject.getString("value1"));
|
|
|
|
js.put("value2",jsonObject.getString("value2"));
|
|
|
|
break;
|
|
|
|
case 2:
|
|
|
|
//血压
|
|
|
|
js.put("value1",jsonObject.getString("value1"));
|
|
|
|
js.put("value2",jsonObject.getString("value2"));
|
|
|
|
if(StringUtils.isNotBlank(jsonObject.getString("value3"))){
|
|
|
|
js.put("value3",jsonObject.getString("value3"));
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
case 3:
|
|
|
|
//体重/身高/BMI
|
|
|
|
js.put("value1",jsonObject.getString("value1"));
|
|
|
|
js.put("value2",jsonObject.getString("value2"));
|
|
|
|
if(StringUtils.isNotBlank(jsonObject.getString("value3"))){
|
|
|
|
js.put("value3",jsonObject.getString("value3"));
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
case 4:
|
|
|
|
//腰围
|
|
|
|
js.put("value1",jsonObject.getString("value1"));
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
jsonArray.add(js);
|
|
|
|
json.put("data",jsonArray);
|
|
|
|
//上传
|
|
|
|
return json.toJSONString();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 时间处理
|
|
|
|
* @param time
|
|
|
|
* @return
|
|
|
|
*/
|
|
|
|
private String subStringTime(String time){
|
|
|
|
return StringUtils.isBlank(time)? "":time.substring(0,19);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 状态默认值
|
|
|
|
* @param status
|
|
|
|
* @return
|
|
|
|
*/
|
|
|
|
private String getStatus(String status){
|
|
|
|
return StringUtils.isBlank(status)?"0":status;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* 初始化把奕拓小屋数据存入es
|
|
* 初始化把奕拓小屋数据存入es
|
|
*/
|
|
*/
|
|
public void initIntoEs(){
|
|
public void initIntoEs(){
|
|
String id = "";
|
|
String id = "";
|
|
String sqlM = "SELECT * FROM wlyy_iot_m WHERE id >? and create_time is not null ORDER BY id limit 2";
|
|
|
|
|
|
String sqlM = "SELECT * FROM wlyy_iot_m WHERE id >? and create_time is not null ORDER BY id limit 1000";
|
|
String sqlD = "SELECT * from wlyy_iot_d WHERE mid = ?";
|
|
String sqlD = "SELECT * from wlyy_iot_d WHERE mid = ?";
|
|
List<WlyyIotM> list = jdbcTemplate.query(sqlM, new Object[] {id}, new BeanPropertyRowMapper<>(WlyyIotM.class));
|
|
List<WlyyIotM> list = jdbcTemplate.query(sqlM, new Object[] {id}, new BeanPropertyRowMapper<>(WlyyIotM.class));
|
|
while (list.size()>0){
|
|
while (list.size()>0){
|
|
|
|
List<String> saveList = new ArrayList<>();
|
|
list.forEach(wlyyIotM->{
|
|
list.forEach(wlyyIotM->{
|
|
List<WlyyIotD> wlyyIotDList = jdbcTemplate.query(sqlD, new Object[] {wlyyIotM.getId()}, new BeanPropertyRowMapper<>(WlyyIotD.class));
|
|
List<WlyyIotD> wlyyIotDList = jdbcTemplate.query(sqlD, new Object[] {wlyyIotM.getId()}, new BeanPropertyRowMapper<>(WlyyIotD.class));
|
|
JSONObject json = new JSONObject();
|
|
JSONObject json = new JSONObject();
|
|
@ -72,10 +223,10 @@ public class IotAnalyzerService extends BaseJpaService<WlyyIotD, WlyyIotDDao> {
|
|
for(WlyyIotD iotD:wlyyIotDList){
|
|
for(WlyyIotD iotD:wlyyIotDList){
|
|
String value = "EcgData".equals(iotD.getCode())?iotD.getContent():iotD.getValue();
|
|
String value = "EcgData".equals(iotD.getCode())?iotD.getContent():iotD.getValue();
|
|
if(map2.containsKey(iotD.getType())){
|
|
if(map2.containsKey(iotD.getType())){
|
|
map2.get(iotD.getType()).put(iotD.getCode(),iotD.getValue());
|
|
|
|
|
|
map2.get(iotD.getType()).put(iotD.getCode(),value);
|
|
}else{
|
|
}else{
|
|
Map<String,Object> map = new HashMap<>();
|
|
Map<String,Object> map = new HashMap<>();
|
|
map.put(iotD.getCode(),iotD.getValue());
|
|
|
|
|
|
map.put(iotD.getCode(),value);
|
|
map2.put(iotD.getType(),map);
|
|
map2.put(iotD.getType(),map);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@ -93,17 +244,82 @@ public class IotAnalyzerService extends BaseJpaService<WlyyIotD, WlyyIotDDao> {
|
|
jsonArray.add(js);
|
|
jsonArray.add(js);
|
|
json.put("data",jsonArray);
|
|
json.put("data",jsonArray);
|
|
try {
|
|
try {
|
|
dataInputService.inputBodySignsData(json.toJSONString());
|
|
|
|
|
|
JSONObject tmp = inputBodySignsData(json.toJSONString());
|
|
|
|
if(tmp!=null){
|
|
|
|
saveList.add(tmp.toJSONString());
|
|
|
|
}
|
|
|
|
|
|
}catch (Exception e){
|
|
}catch (Exception e){
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
|
|
//将数据存入es
|
|
|
|
boolean success = false;
|
|
|
|
try {
|
|
|
|
success = elasticSearchHelper.save(ConstantUtils.esIndex, ConstantUtils.esType, saveList);
|
|
|
|
}catch (Exception e){
|
|
|
|
e.printStackTrace();
|
|
|
|
logger.error("upload signBodyData to elasticsearch failed," + e.getMessage());
|
|
|
|
}
|
|
|
|
if(success){
|
|
|
|
logger.info("上传成功:"+list.size());
|
|
|
|
}else{
|
|
|
|
logger.info("初始化把i健康体征数据存入es,导入失败");
|
|
|
|
}
|
|
logger.info("初始化把奕拓小屋数据存入es,导入"+list.size()+"条,初始id:"+id);
|
|
logger.info("初始化把奕拓小屋数据存入es,导入"+list.size()+"条,初始id:"+id);
|
|
id = list.get(list.size()-1).getId();
|
|
id = list.get(list.size()-1).getId();
|
|
list = jdbcTemplate.query(sqlM, new Object[] {id}, new BeanPropertyRowMapper<>(WlyyIotM.class));
|
|
list = jdbcTemplate.query(sqlM, new Object[] {id}, new BeanPropertyRowMapper<>(WlyyIotM.class));
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 上传数据
|
|
|
|
* @param json
|
|
|
|
* @return
|
|
|
|
*/
|
|
|
|
public JSONObject inputBodySignsData(String json) throws IOException {
|
|
|
|
String rowkey = "";
|
|
|
|
//提取json某些项值
|
|
|
|
DataBodySignsDO dataBodySignsDO = null;
|
|
|
|
try {
|
|
|
|
dataBodySignsDO = JSONObject.parseObject(json,DataBodySignsDO.class);
|
|
|
|
}catch (Exception e){
|
|
|
|
logger.error("json parse error,invalid json string");
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
JSONObject jsonObject = JSONObject.parseObject(json);
|
|
|
|
String accessToken= dataBodySignsDO.getAccess_token();
|
|
|
|
String dataSource = dataBodySignsDO.getData_source();
|
|
|
|
String deviceSn = dataBodySignsDO.getSn();
|
|
|
|
String extCode = dataBodySignsDO.getExt_code();
|
|
|
|
|
|
|
|
JSONArray jsonArray = jsonObject.getJSONArray("data");
|
|
|
|
if(null == jsonArray || jsonArray.size() == 0){
|
|
|
|
logger.error("msg","parameter 'data' of json no exist");
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
|
|
|
//循环数据,一组数据存一行,生成一个rowkey,并将该rowkey存到es中
|
|
|
|
for(Object obj:jsonArray){
|
|
|
|
JSONObject data = (JSONObject)obj;
|
|
|
|
data.put("del","1"); //添加删除标记
|
|
|
|
try {
|
|
|
|
String measuretime = jsonObject.getString("measure_time");
|
|
|
|
if(null == measuretime){
|
|
|
|
measuretime = DateUtils.formatDate(new Date(), DateUtil.yyyy_MM_dd_HH_mm_ss);
|
|
|
|
}
|
|
|
|
//生成一份json数据的rowkey
|
|
|
|
rowkey = RowKeyUtils.makeRowKey(dataSource,deviceSn);
|
|
|
|
data.put("rid",rowkey);//hbase的rowkey
|
|
|
|
} catch (Exception e) {
|
|
|
|
logger.error("make rowkey error");
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return jsonObject;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|