/** * 会话模型。 */ "use strict"; let RedisClient = require('../../repository/redis/redis.client.js'); let RedisModel = require('./../redis.model.js'); let ModelUtil = require('../../util/model.util'); let Messages = require('../messages/messages'); let Users = require('../user/users'); let DoctorRepo = require('../../repository/oracle/doctor.repo'); let Participants = require('./participants'); let SessionRepo = require('../../repository/oracle/session.repo'); let TopicRepo = require('../../repository/oracle/topics.repo'); let MessageRepo = require('../../repository/oracle/message.repo'); let ParticipantRepo = require('../../repository/oracle/participant.repo'); let ImDb = require('../../repository/oracle/db/im.db'); let WlyySDK = require("../../util/wlyy.sdk"); let ObjectUtil = require("../../util/object.util.js"); let WechatClient = require("../client/wechat.client.js"); let AppClient = require("../client/app.client.js"); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let redis = RedisClient.redisClient().connection; let logger = require('../../util/log.js'); let mongoose = require('mongoose'); let async = require("async"); let log = require("../../util/log.js"); let pubSub = require("../redis/pubSub.js"); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; const SESSION_TYPES = require('../../include/commons').SESSION_TYPES; const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE; const PARTICIPANT_ROLES = require('../../include/commons').PARTICIPANT_ROLES; const CONTENT_TYPES = require('../../include/commons').CONTENT_TYPES; const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE; const SESSION_STATUS = require('../../include/commons').SESSION_STATUS; class Sessions extends RedisModel { constructor() { super(); } /** * 创建会话。会话的ID来源: * MUC:患者的ID * P2P:对成员的ID排序后,取hash值 * GROUP:团队的ID * * @param sessionId * @param name 会话名称 * @param type 会话类型 * @param participantArray 会话成员 * @param handler 回调,仅MUC模式使用 */ createSession(sessionId, name, type, participantArray, handler) { let self = this; let messageId = mongoose.Types.ObjectId().toString(); //创建session到mysql self.createSessionToMysql(sessionId, name, type, participantArray, messageId, function (err, res) { if (err) { if (handler) { handler(err, null); return; } ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null); } else { name = res.name; //创建session到redis self.createSessionToRedis(sessionId, name, type, participantArray, messageId, function (err, res) { if (err) { if (handler) { handler(err, null); return; } ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null); } else { if (handler) { handler(null, res); return; } ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res}); } }); } }); } /** * 创建会话。REDIS * @param sessionId * @param name * @param type * @param participantArray * @param messageId * @param handler * @returns {boolean} */ createSessionToRedis(sessionId, name, type, participantArray, messageId, handler) { let self = this; let messages = new Messages(); let participantIdArray = []; for (let i in participantArray) { participantIdArray.push(participantArray[i].split(":")[0]); } if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) { if (sessionId) { callBusinessType(sessionId); return; } else if (participantIdArray.length != 2) { handler("P2P session only allow 2 participants.", null); return false; }else{ ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) { sessionId = res; callBusinessType(sessionId); return; }); } } else { if (!sessionId) { if(type == SESSION_TYPES.DISCUSSION){ sessionId = messageId; }else { handler("MUC OR GROUP session sessionId is not allow null .", null); return; } } callBusinessType(sessionId); } function callBusinessType(sessionId) { if(type == SESSION_TYPES.MUC|| type == SESSION_TYPES.PRESCRIPTION || SESSION_TYPES.PRESCRIPTION_HOSPITAL || SESSION_TYPES.COLLABORATION_HOSPITAL || SESSION_TYPES.GUIDANCE_HOSPITAL || SESSION_TYPES.GENERAL_EXPERT || SESSION_TYPES.PRESCRIPTION_HOSPITAL_VIDEO || SESSION_TYPES.MUC_VIDEO || SESSION_TYPES.ONDOOR_NURSING){ callCreate(sessionId, SESSION_BUSINESS_TYPE.PATIENT); }else if(type==SESSION_TYPES.DISCUSSION||type==SESSION_TYPES.GROUP){ callCreate(sessionId, SESSION_BUSINESS_TYPE.DOCTOR); }else { ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) { callCreate(sessionId, businessType); }); } } function callCreate(sessionId, businessType) { let createDate = new Date(); Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) { let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); if (type == SESSION_TYPES.MUC|| type == SESSION_TYPES.PRESCRIPTION || SESSION_TYPES.PRESCRIPTION_HOSPITAL || SESSION_TYPES.COLLABORATION_HOSPITAL || SESSION_TYPES.GUIDANCE_HOSPITAL || SESSION_TYPES.GENERAL_EXPERT || SESSION_TYPES.PRESCRIPTION_HOSPITAL_VIDEO || SESSION_TYPES.MUC_VIDEO || SESSION_TYPES.ONDOOR_NURSING) { businessType = 2; } let session = { id: sessionId, name: name, type: type, create_date: createDate.getTime(), business_type: businessType }; //如果会话已经存在的就不需要发送会话成功的消息不更新最后一条消息 redis.hexistsAsync(sessionKey, sessionId).then(function(res){ if(res==0){ redis.hmsetAsync(sessionKey, session).then(function () { handler(null, session); }) } }) }); } } /** * 创建会话。mysql * @param sessionId * @param name * @param type * @param participantArray * @param messageId * @param handler */ createSessionToMysql(sessionId, name, type, participantArray, messageId, handler) { let self = this; //如果sessionId不存在则执行创建sessionId过程 let participantIdArray = []; for (let i in participantArray) { participantIdArray.push(participantArray[i].split(":")[0]); } //流程1-判断是否存在sessionId不存在则创建对应的sessionId; if (!sessionId) { if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) { ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) { sessionId = res; callBusinessType(); }); } else if(type == SESSION_TYPES.DISCUSSION){ sessionId = messageId; callBusinessType(); } else { return handler("MUC模式和团队模式,不允许sessionId为空!", null); } } else { callBusinessType(); } //流程2-判断session的业务类型; function callBusinessType() { if(type==SESSION_TYPES.MUC|| type==SESSION_TYPES.PRESCRIPTION || SESSION_TYPES.PRESCRIPTION_HOSPITAL || SESSION_TYPES.COLLABORATION_HOSPITAL || SESSION_TYPES.GUIDANCE_HOSPITAL || SESSION_TYPES.GENERAL_EXPERT || SESSION_TYPES.PRESCRIPTION_HOSPITAL_VIDEO || SESSION_TYPES.MUC_VIDEO || SESSION_TYPES.ONDOOR_NURSING){ callCreateSession(SESSION_BUSINESS_TYPE.PATIENT); }else if(type==SESSION_TYPES.SPECIALISTGROUP||type==SESSION_TYPES.DISCUSSION){ callCreateSession(SESSION_BUSINESS_TYPE.DOCTOR); }else{ ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) { if (err) { handler(err, null); return; } callCreateSession(businessType); }); } } //流程3-发起session创建 返回session实例 function callCreateSession(businessType) { //查找该sessionId是否存在存在则直接返回实例 SessionRepo.findOne(sessionId, function (err, res) { if (res && res.length > 0) {//已经存在 //已存在的会话不修改名称 name = res[0].name; //更新成员 Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, update) { handler(null, res[0]); return; }) let createDate = new Date(); self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) { logger.info("update session status is true"); }) } else { let createDate = new Date(); let session = { id: sessionId, name: name, type: type, create_date: createDate.getTime(), business_type: businessType }; //将session写入数据库 self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) { if (err) { handler(err, null); return; } callCreateParticipants(session); }) } }); } //流程4-发起session成员创建 function callCreateParticipants(session) { Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) { if (err) { handler(err, null); return; } else { handler(null, session); return; } }) } } /** * 最近会话列表,7天内。 * * @param userId * @param dateSpan */ getRecentSessions(userId, dateSpan) { let self = this; SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err); return; } let sessions = []; res.forEach(function (session) { //最近列表用于转发,过滤不可用的咨询 if(session.last_content_type != CONTENT_TYPES.TopicEnd){ sessions.push({ id: session.id, name: session.name, type: session.type, business_type: session.business_type, create_date: session.create_date }) } }); ModelUtil.emitOK(self.eventEmitter, sessions); }); } /** * 保存session到MySQL * @param sessionId * @param name * @param type * @param createDate * @param businessType * @param handler */ saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) { SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler); } /** * 获取某个用户的全部session列表 * @param userId * @param handler */ getUserSessionsFromMysql(userId, handler) { SessionRepo.findAll(userId, handler); } /** * 获取session单个对象 * @param sessionId * @param handler */ getSessions(sessionId, handler) { SessionRepo.findOne(sessionId, handler); } /** * 判断会话是否存在 * @param sessionId * @param handler */ isExist(sessionId) { let self = this; SessionRepo.findOne(sessionId, function (err, res) { if(err){ log.error(err); ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null); } else if(res&&res.length>0){ ModelUtil.emitOK(self.eventEmitter, {sessionId: res[0].id, status: 200}); }else { ModelUtil.emitOK(self.eventEmitter, {sessionId: '', status: 200}); } }); } getSession(sessionId,userId,handler){ let self = this; let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); redis.hgetallAsync(sessionKey).then(function(session){ if(session.type==SESSION_TYPES.P2P){ ParticipantRepo.findNameById(userId, function (err, res) { session.name = res[0].name; if(handler){ handler(null,session); return; } ModelUtil.emitOK(self.eventEmitter, session); }) }else{ if(handler){ handler(null,session); return; } ModelUtil.emitOK(self.eventEmitter, session); } }).catch(function (err) { logger.error("Get session failed: ", err); if(handler){ handler(null,session); return; } ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null); }) } /** *MDT 医生数据转换,根据手机号查询医生CODE */ conversionAarticipant(doctorMobile, callback) { let doctorProto = DoctorRepo; doctorProto.findByMobile(doctorMobile, function (err, oDoctor) { if(oDoctor.length>0 && oDoctor[0].id){ if(callback){ callback(oDoctor[0].id) } }else { if(callback){ logger.error("Get doctoc code failed: mobile:", doctorMobile); callback(doctorMobile) } } }); } /** * 根据用户ID获取用户的session列表 * @param userId * @param page * @param size * @param businessType */ getUserSessions(userId, page, size, businessType) { let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId); let self = this; if (page > 0) { if (page == 1) { page = 0; } page = page + page * size; } async.waterfall([ // 获取会话ID列表 // function (callback) { // redis.zrevrangeAsync(userSessionKey, page, size) // .then(function (sessionIds) { // if (sessionIds.length == 0) { // ModelUtil.emitOK(self.eventEmitter, []); // return; // } // callback(null, sessionIds); // }) // }, function (callback) { SessionRepo.findAllByType(userId,businessType,page,size,function(err,res){ if (res.length == 0) { ModelUtil.emitOK(self.eventEmitter, []); return; } var sessionIds=[]; for(var j in res){ sessionIds.push(res[j].id); } callback(null,sessionIds); }) }, // 遍历会话 function (sessionIds) { let sessionList = []; let functionList = []; log.info("sessionIds.length:" + sessionIds.length); for (let j = 0; j < sessionIds.length; j++) { let fun = function (index, callback) { log.info("!callback:" + !callback); if (!callback) { callback = index, index = 0 } let sessionId = sessionIds[index]; let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); log.info("sessionKey:" + sessionKey); let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); redis.multi() .hgetall(sessionKey) // 会话实体 .hget(participantsRoleKey, userId) // 用户在此会话中的角色 .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间 .zrange(sessionParticipantsKey, 0, -1) .zrange(sessionParticipantsKey, 0,-1,'withscores') // 所有用户在此会话中最后一次获取未读消息的时间 .hgetall(participantsRoleKey) // 所有用户在此会话中角色 .execAsync() .then(function (res) { let session = res[0]; log.info("top.session.id:" + session.id); log.info("top.session.name:" + session.name); log.info("tope.session.business_type:" + session.business_type); let role = res[1]; let lastFetchTime = res[2]; let users = res[3]; let participantsTimeArray = res[4]; let userRoles = res[5]; let participantsTime = []; let isInvite = true; //处理session未加入redis的bug log.info("1.session==null:" + session==null); if(session==null){ let lastLoginTime = new Date(); SessionRepo.findOne(sessionId, function (err, res) { if(res){ session = res; log.info("1.session.id:" + session.id); log.info("1.session.name:" + session.name); log.info("1.session.business_type:" + session.business_type); let redisSession = [ "id", session.id, "name", session.name, "type", session.type, "business_type", session.business_type, "last_sender_id", session.last_sender_id||"", "last_sender_name", session.last_sender_name||"", "last_content_type", session.last_content_type||"", "last_content", session.last_content||"", "last_message_time", session.last_message_time||"", "create_date", ObjectUtil.timestampToLong(session.create_date), "status",session.status==null?0:session.status ]; // cache sessions redis.multi() .zadd(REDIS_KEYS.Sessions, lastLoginTime.getTime(), sessionId) // 会话的最后活动时间设置为此用户的登录时间 .zadd(RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime.getTime(), sessionId) // 会话的最后活动时间设置为此用户的登录时间 .hmset(RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId), redisSession) .execAsync() .then(function (res) { // cache participants let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); ParticipantRepo.findAll(sessionId, function (err, participants) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } let multi = redis.multi(); participants.forEach(function (participant) { let participantId = participant.id; let participantRole = participant.role; let score = ObjectUtil.timestampToLong(participant.last_fetch_time||(new Date())); multi = multi.zadd(sessionParticipantsKey, score, participantId) .hset(sessionParticipantsRoleKey, participantId, participantRole); }); multi.execAsync() .then(function (res) { }) .catch(function (ex) { log.error("Login failed while caching participants: ", ex); }); }); // cache messages let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, null, function (err, messages) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } let multi = redis.multi(); messages.forEach(function (message) { let msgJson = { id: message.id, sender_id: message.sender_id, sender_name: message.sender_name, timestamp: ObjectUtil.timestampToLong(message.timestamp), content_type: message.content_type, content: message.content }; multi = multi.hset(messagesKey, message.id, JSON.stringify(msgJson)) .zadd(messagesByTimestampKey, ObjectUtil.timestampToLong(message.timestamp), message.id); }); multi.execAsync() .then(function (res) { }) .catch(function (ex) { log.error("Login failed while caching messages: ", ex); }); }); // cache topics for MUC let topicsKey = RedisModel.makeRedisKey(REDIS_KEYS.Topics, sessionId); TopicRepo.findAllBySessionId(sessionId, function (err, topics) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } topics.forEach(function (topic) { let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topic.id); let topicId = topic.id; let name = topic.name == null ? "" : topic.name; let createTime = ObjectUtil.timestampToLong(topic.create_time); let endBy = topic.end_by == null ? "" : topic.end_by; let endTime = topic.end_time == null ? 0 : ObjectUtil.timestampToLong(topic.end_time); let startMessageId = topic.start_message_id == null ? "" : topic.start_message_id; let endMessageId = topic.end_message_id == null ? "" : topic.end_message_id; let description = topic.description == null ? "" : topic.description; let status = topic.status == null ? 0 : topic.status; redisConn.multi() .zadd(topicsKey, createTime, topicId) .hmset(topicKey, 'name', name, 'session_id', sessionId, 'create_time', createTime, 'end_by', endBy, 'end_time', endTime, 'start_message_id', startMessageId, 'end_message_id', endMessageId, 'description', description, 'status', status) .execAsync() .catch(function (ex) { log.error("Login failed while caching topics: ", ex); }); }); }); }) .catch(function (ex) { log.error("Login failed while caching sessions: ", ex); }); } }); } for(var j in userRoles){ if(userRoles[j]==1){ isInvite = false; break; } } for(var j = 0 ;j 1 || isoffset == 1) { offset += 1; // 翻页由于闭区间,需跳过本身数据 } participants.existsParticipant(sessionId, userId, function (err, res) { log.info("1151-userId=" + userId); log.info(" res[0].exist=" + res[0].exist); log.info(!res[0].exist && userId != "system"); if (! res[0].exist && userId != "system") { handler(Error("User not found in session " + sessionId), null); } else { //将消息ID转换成分值 redis.multi() .zscore(messagesTimestampKey, startMsgId) .zscore(messagesTimestampKey, endMsgId) .hgetall(sessionKey) .zrange(sessionParticipantsKey, 0, -1) .execAsync() .then(function (res) { let startMsgScore = res[1]; let endMsgScore = res[0]; let session = res[2]; let users = res[3]; if (startMsgScore == null || endMsgScore == null || (startMsgScore == endMsgScore && isoffset == 1)) { handler(null, []); return; } //结束大于开始,正序取数据,返回的数据顺序也是逆序的,反向拉取数据, // 当end>start取出来都是空的,为了给前端获取新数据使用,一般不出现这种情况 if(endMsgScore>startMsgScore){ redis.zrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count) .then(function (res) { if (res && res.length == 0) { handler(null, []); return; } redis.hmgetAsync(messagesKey, res).then(function (messages) { messages.reverse(); handler(null, messages); }).then(function () { Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime()); if(session.type == SESSION_TYPES.P2P){ for(var j in users){ if(users[j]==userId)continue; WechatClient.sendAllRead(users[j],sessionId); } }else if(session.type == SESSION_TYPES.MUC || session.type == SESSION_TYPES.PRESCRIPTION || SESSION_TYPES.PRESCRIPTION_HOSPITAL || SESSION_TYPES.COLLABORATION_HOSPITAL || SESSION_TYPES.GUIDANCE_HOSPITAL || SESSION_TYPES.GENERAL_EXPERT || SESSION_TYPES.PRESCRIPTION_HOSPITAL_VIDEO || SESSION_TYPES.MUC_VIDEO || SESSION_TYPES.ONDOOR_NURSING){ for(var j in users){ if(users[j]==userId)continue; WechatClient.sendMucAllRead(users[j],userId,sessionId); } } }) }) .catch(function (err) { logger.error("Get message by page failed: ", err); handler(err, false); }) }else{ // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息 redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count) .then(function (res) { if (res && res.length == 0) { handler(null, []); return; } redis.hmgetAsync(messagesKey, res).then(function (messages) { handler(null, messages); }).then(function () { Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime()); if(session.type == SESSION_TYPES.P2P){ for(var j in users){ if(users[j]==userId)continue; //通知对方自己已经读取数据 WechatClient.sendAllRead(users[j],sessionId); } }else if(session.type == SESSION_TYPES.MUC || session.type == SESSION_TYPES.PRESCRIPTION || SESSION_TYPES.PRESCRIPTION_HOSPITAL || SESSION_TYPES.COLLABORATION_HOSPITAL || SESSION_TYPES.GUIDANCE_HOSPITAL || SESSION_TYPES.GENERAL_EXPERT || SESSION_TYPES.PRESCRIPTION_HOSPITAL_VIDEO || SESSION_TYPES.MUC_VIDEO || SESSION_TYPES.ONDOOR_NURSING){ for(var j in users){ if(users[j]==userId)continue; //如果是患者拉取数据告诉在线的医生患者已经读取数据 WechatClient.sendMucAllRead(users[j],userId,sessionId); } } }) }) .catch(function (err) { logger.error("Get message by page failed: ", err); handler(err, false); }) } }) } }) } /** * 获取所有会话的未读消息数。 */ getAllSessionsUnreadMessageCount(userId,type,handler) { let self = this; let count = 0; let patientCount = 0; let doctorCount = 0; let patientEndCount = 0; SessionRepo.findAll(userId, function (err, res) { // SessionRepo.findUnEndAll(userId, function (err, res) { if (err) { if(handler) { handler(err,res); return; } ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err); return; } if (res && res.length == 0) { if(handler) { handler(err,count); return; } ModelUtil.emitOK(self.eventEmitter, {count: count}); return; } for (let j in res) { //是否过滤指定类型 if( type != null){ if(type != res[j].type){ continue; } } if (res[j].type == SESSION_TYPES.SYSTEM) { if (j == res.length - 1) { if(handler){ handler(err,count); return; } ModelUtil.emitOK(self.eventEmitter, {count: count, patient: patientCount, doctor: doctorCount}); } continue; } callback(res, j, res[j]); } }); function callback(res, j, session) { self.getSessionUnreadMessageCount(res[j].id, userId, function (err, con) { if (err) { if(handler) { handler(err,count); return; } ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err); } count = count + con; if (session.business_type == SESSION_BUSINESS_TYPE.PATIENT) { if(session.status == SESSION_STATUS.ENDED){//新增判断是否咨询结束 patientEndCount = patientEndCount + con; }else{ patientCount = patientCount + con; } } else { doctorCount = doctorCount + con; } if (j == res.length - 1) { if(handler) { handler(err,count) return; } ModelUtil.emitOK(self.eventEmitter, {count: count, patient: patientCount,patientEnd: patientEndCount, doctor: doctorCount}); } }) } } /** * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。 * * @param sessionId * @param userId */ getSessionUnreadMessageCount(sessionId, userId, handler) { let self = this; let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); async.waterfall([ // 此成员最后获取消息的时间 function (callback) { redis.zscoreAsync(participantsKey, userId) .then(function (lastFetchTime) { callback(null, lastFetchTime); }) }, // 计算最后获取消息的时间之后到现在有多少条消息 function (lastFetchTime, callback) { if (!lastFetchTime) lastFetchTime = 0; let now = new Date().getTime(); redis.zcountAsync(messagesByTimestampKey, parseInt(lastFetchTime)+1, now) .then(function (count) { if (handler) { handler(null, count); } else { ModelUtil.emitOK(self.eventEmitter, {count: count}); } }) } ], function (err, res) { if (err) { if (handler) { handler(err, 0); } else { ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.") } } }); } /** * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。 */ getSessionUnreadMessages(sessionId, userId) { let self = this; let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); async.waterfall([ // 此成员最后获取消息的时间 function (callback) { redis.zscoreAsync(participantsKey, userId) .then(function (lastFetchTime) { callback(null, lastFetchTime); }) }, // 最后获取消息的时间之后到现在的消息ID列表 function (lastFetchTime, callback) { if (!lastFetchTime) lastFetchTime = 0; let now = new Date().getTime(); redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now) .then(function (messageIds) { callback(null, messageIds); }) }, // 获取消息 function (messageIds, callback) { if (messageIds.length == 0) { ModelUtil.emitOK(self.eventEmitter, []); return; } let startMsgId = messageIds[0]; let endMsgId = messageIds[messageIds.length - 1]; self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } ModelUtil.emitOK(self.eventEmitter, res); }); } ], function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.") } }); } /** * 保存消息。 * * 也可以根据议题保存消息,但最终还是保存到与会话对象。 * * see also: saveMessageByTopic * * @param message * @param sessionId */ saveMessageBySession(sessionId, message) { let self = this; let messages = new Messages(); let participants = new Participants(); let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); let sessionType =0; let sessionName; message.id = messageId; // 检查会话中是否存在此成员 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Check session participant failed: ", err); return; } logger.info("session.js--1443--res[0].exist" + res[0].exist); log.info("message.sender_id=" + message.sender_id); if ( res[0].exist || message.sender_id == "system") { redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) { sessionType = res[0]; let sessionName = res[1]; if (sessionType == null) { self.getSessions(sessionId,function(err,res){ if (err){ logger.error("session data is error"); } else { sessionName=res[0].name; sessionType = res[0].type; } }); let participantArray = []; let participantsStr="{"; ParticipantRepo.findAll(sessionId, function (err, participants) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } participants.forEach(function (participant) { let participantId = participant.id; let participantRole = participant.role; let score = ObjectUtil.timestampToLong(participant.last_fetch_time||(new Date())); participantsStr +="\""+participantId+"\":\""+participantRole+"\","; }); participantsStr = participantsStr.substring(0,participantsStr.length-1)+'}'; participantsStr = JSON.parse(participantsStr); for (let j in participantsStr) { participantArray.push(j + ":" + participantsStr[j]); } //创建session到redis self.createSessionToRedis(sessionId, sessionName, sessionType, participantArray, messageId, function (err, res) { if (err) { if (handler) { handler(err, null); } ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null); } else { if (handler) { handler(null, res); } ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res}); } }); }); } if(sessionType == SESSION_TYPES.MUC || sessionType == SESSION_TYPES.PRESCRIPTION || SESSION_TYPES.PRESCRIPTION_HOSPITAL || SESSION_TYPES.COLLABORATION_HOSPITAL || SESSION_TYPES.GUIDANCE_HOSPITAL || SESSION_TYPES.GENERAL_EXPERT || SESSION_TYPES.PRESCRIPTION_HOSPITAL_VIDEO || SESSION_TYPES.MUC_VIDEO || SESSION_TYPES.ONDOOR_NURSING){ if(message.content_type == CONTENT_TYPES.PlainText || message.content_type == CONTENT_TYPES.Image || message.content_type == CONTENT_TYPES.Audio|| message.content_type == CONTENT_TYPES.Video){ TopicRepo.findLastBySessionId(sessionId,function(err,res){ if(res&&res.length>0&&res[0].reply==0){ TopicRepo.replyTopic(message.sender_id,message.id,res[0].id,function(err,res){ if(err){ logger.error("update topic reply error"); }else{ logger.warn("update topic reply success"); } }); } }) } } // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间 messages.saveMessageToRedis(sessionId, sessionType, messageId, message); Messages.updateLastContent(sessionKey, sessionType, sessionName, message); Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime()); // 更新MYSQL中会话的最新状态,并保存消息 SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId); messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err}); } else { message.timestamp = message.timestamp.getTime(); ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]}); } }); }).then(function (res) { // 推送消息 ParticipantRepo.findIds(sessionId, function (err, res) { if (err) { ModelUtil.logError("Push message from session: get participant's id list failed: ", err); } else { message.session_id = sessionId; res.forEach(function (participant) { if (participant.id == message.sender_id||sessionType==SESSION_TYPES.SYSTEM){ message.sender_img = participant.avatar; callPush(res,message); } }) } }) }).catch(function (err) { ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err}); }) } else { ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"}); } }); function callPush(participants,message){ participants.forEach(function (participant) { if (participant.id !== message.sender_id && participant.participant_role == PARTICIPANT_ROLES.HOST) { Sessions.pushNotification(participant.id, participant.name, message,sessionType); } }); } } sendTopicMessages(topicId, message) { let self = this; TopicRepo.findAllByTopicId(topicId, function (err, res) { if (err || res.length == 0) { ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": "议题获取失败"}); return; } self.saveMessageByTopic(message, res[0].session_id, function (err, messageId) { if (err) { ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": err}); } else { message.id = messageId; ModelUtil.emitOK(self.eventEmitter, {status: 200, "message": "发送成功", data: message}); } }); }); } /** * 保存消息 * * @param message * @param sessionId * @param handler */ saveMessageByTopic(message, sessionId, handler) { let messages = new Messages(); let participants = new Participants(); let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); let self = this; let sessionType = 0; let sessionName = ""; let agent = message.agent;//代理人 message.id = messageId; if(!message.timestamp){ message.timestamp = new Date(); } // 发送成员必须处于会话中 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { log.info("1599-message.sender_id=" + message.sender_id); log.info("res[0].exist=" + res[0].exist); if ( res[0].exist || message.sender_id == "system") { redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) { sessionType = res[0]; sessionName = res[1]; if (!sessionType || !sessionName) { logger.error("Unknown session key " + session_key); if (handler) { handler(new Error("Unknown session key " + session_key));return; }; } }).then(function (res) { // 消息数据双写,并更新用户最后消息获取时间,会话新状态等 messages.saveMessageToRedis(sessionId, sessionType, messageId, message); messages.saveMessageToMysql(sessionId, sessionType, messageId, message); // 更新会话最新状态及成员最后一次消息获取时间 Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime()); Messages.updateLastContent(session_key, sessionType, sessionName, message); SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId); if (handler) { handler(null, messageId); return; } }).then(function (res) { // 推送消息 ParticipantRepo.findIds(sessionId, function (err, res) { if (err) { logger.error(err); } else { message.session_id = sessionId; res.forEach(function (participant) { if (participant.id == message.sender_id){ message.sender_img = participant.avatar; callPush(res,message); } }) } }) }).catch(function (err) { log.error(err); return; }) } else { if (handler){ handler("用户不在此会话当中!", messageId);return;} } }); function callPush(participants,message){ participants.forEach(function (participant) { if ((participant.id !== message.sender_id||message.content_type == CONTENT_TYPES.PrescriptionBloodStatus || message.content_type == CONTENT_TYPES.PrescriptionFollowupContent) && participant.participant_role == PARTICIPANT_ROLES.HOST) { Sessions.pushNotification(participant.id, participant.name, message,sessionType); } }); } } /** * 保存代理人进入的消息 * * @param message * @param sessionId * @param handler */ saveIntoMessageByTopic(message, sessionId, handler) { log.info("2--保存代理人进入的消息:"); let messages = new Messages(); let participants = new Participants(); let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); let self = this; let sessionType = 0; let sessionName = ""; let agent = message.agent;//代理人 message.id = messageId; if(!message.timestamp){ message.timestamp = new Date(); } // 发送成员必须处于会话中 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { log.info("session.js--1477--res[0].exist" + res[0].exist); log.info("message.sender_id=" + message.sender_id); if (res[0].exist || message.sender_id == "system") { redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) { sessionType = res[0]; sessionName = res[1]; if (!sessionType || !sessionName) { logger.error("Unknown session key " + session_key); if (handler) { handler(new Error("Unknown session key " + session_key));return; }; } }).then(function (res) { //查找最后一条数据,如果一致就不保存 MessageRepo.findLastMessage(sessionId,sessionType,function (err, res) { if (err) { logger.error(err); } else { res.forEach(function (mes) { // if(mes.content==message.content&&message.content_type==mes.content_type){ if(mes.agent==message.agent){ handler(null, messageId); return; }else { // 消息数据双写,并更新用户最后消息获取时间,会话新状态等 messages.saveMessageToRedis(sessionId, sessionType, messageId, message); messages.saveMessageToMysql(sessionId, sessionType, messageId, message); // 更新会话最新状态及成员最后一次消息获取时间 Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime()); Messages.updateLastContent(session_key, sessionType, sessionName, message); SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId); if (handler) { handler(null, messageId); return; } } }) } }); }).then(function (res) { // 推送消息 ParticipantRepo.findDoctorIds(sessionId, function (err, res) { if (err) { logger.error(err); } else { message.session_id = sessionId; res.forEach(function (participant) { Sessions.pushIntoNotification(participant.id, participant.name, message,sessionType); }) } }) }).catch(function (err) { log.error(err); return; }) } else { log.info("4--用户不在此会话当中!"); if (handler){ handler("用户不在此会话当中!", messageId);return;} } }); } /** * 置顶操作 */ stickSession(sessionId, user) { let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user); let self = this; //取出最大的session redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) { //获取该session的时间搓 redis.zscoreAsync(user_session_key, res).then(function (scoreres) { let nowtime = new Date().getTime(); //当前时间搓比redis的时间搓更早证明没有置顶过 if (scoreres <= nowtime) { //初始化置顶 redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) { logger.info("stickSession:" + sessionId + ",res:" + res); ModelUtil.emitOK(self.eventEmitter, {}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE); }) } else { //已有置顶的数据,取出来加1保存回去 scoreres = Number(scoreres) + 1; redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () { logger.info("stickSession:" + sessionId + ",res:" + res); ModelUtil.emitOK(self.eventEmitter, {}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, scoreres); }) } }) }) } /** * 取消会话置顶 */ cancelStickSession(sessionId, user) { let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user); let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let self = this; redis.zscoreAsync(participants_key, user).then(function (res) { if (!res) { res = new Date().getTime(); } redis.zaddAsync(user_session_key, res, sessionId).then(function (res) { logger.info("cancelStickSession:" + sessionId); ModelUtil.emitOK(self.eventEmitter, {}); }).then(function () { SessionRepo.unStickySession(sessionId, user); }); }) } /** * 更新会话参与者的最后消息获取时间。 * * @param sessionId * @param userId */ static updateParticipantLastFetchTime(sessionId, userId, score) { score = score + 1; let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); redis.zaddAsync(participantsKey, score, userId) .then(function (res) { ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) { if (err) { logger.error("Update participant last fetch time failed: ", err); } }); }) .catch(function (err) { logger.error("Update participant last fetch time failed: ", err); }); } /** * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。 * * @param targetUserId * @param message */ static pushNotification(targetUserId, targetUserName, message,sessionType) { let self = this; Users.isPatientId(targetUserId, function (err, isPatient) { if (isPatient) { if(config.environment!='local'){//pc版不直接发送给居民,通过redis的publish WechatClient.sendMessage(targetUserId, targetUserName, message); } message.targetUserId = targetUserId; message.targetUserName = targetUserName; message.sessionType = sessionType; message.targetType = 'patient'; } else { if(sessionType==SESSION_TYPES.P2P){ WechatClient.sendReadDoctorByDoctorId(targetUserId, message); } //告知医生新消息 WechatClient.sendSocketMessageToDoctor(targetUserId,message); let count = 0; //系统消息 // MessageRepo.getWlyyMessageCount(targetUserId,function (err,res) { // if(res){ // count = res[0].count; // } // }); //im消息 let sessions = new Sessions(); sessions.getAllSessionsUnreadMessageCount(targetUserId,null,function (err,res) { if(res){ count += res; } }); if(config.environment!='local'){//pc版不推送个推,通过redis的publish log.info("推送医生智能助手") AppClient.sendNotification(targetUserId, message,sessionType,count); } //外网pcim通过socket推送 WechatClient.sendPcImSocket(targetUserId,message,sessionType); // WlyySDK.request(targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) { // let count = 0; // if(err){ // logger.error(err); // }else { // logger.error(res); // res = JSON.parse(res) // if (res.status == 200) { // let data = res.data; // count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount); // } // } // if(config.environment!='local'){//pc版不推送个推,通过redis的publish // AppClient.sendNotification(targetUserId, message,sessionType,count); // } // //外网pcim通过socket推送 // WechatClient.sendPcImSocket(targetUserId,message,sessionType); // }); message.targetUserId = targetUserId; message.targetUserName = targetUserName; message.sessionType = sessionType; message.targetType = 'doctor'; //推送MDT,医生外层新消息 WechatClient.sendMDTSocketMessageToDoctor(targetUserId,message); } //redis发布消息 if(config.pubSubSwitch) {//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次 pubSub.publish(config.pubChannel,JSON.stringify(message)); } }); } /** * 获取redis订阅消息,并处理 * @param targetUserId * @param targetUserName * @param message * @param sessionType */ static getRedisPushNotification(message) { if (message.targetType=='patient') { if(config.environment!='local'){//pc版接收要发给居民的消息不做处理 WechatClient.sendMessage(message.targetUserId, message.targetUserName, message); } } else { if(message.sessionType==SESSION_TYPES.P2P){ WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message); } //告知医生新消息 WechatClient.sendSocketMessageToDoctor(message.targetUserId,message); if(config.environment!='local'){//pc版不推送个推 WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) { let count = 0; res = JSON.parse(res) if (res.status == 200) { let data = res.data; count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount); } AppClient.sendNotification(message.targetUserId, message,message.sessionType,count); }); } } } /** * 向APP端通过socket发送通知消息。 * * @param targetUserId * @param message */ static pushIntoNotification(targetUserId, targetUserName, message,sessionType) { if(sessionType==SESSION_TYPES.P2P){ WechatClient.sendReadDoctorByDoctorId(targetUserId, message); } //告知医生新消息 WechatClient.sendSocketMessageToDoctor(targetUserId,message); } /** * 针对MUC模式更新会话的当前状态 * @param sessionId */ updateSessionStatus(sessionId,status,handler){ let self = this; let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session,sessionId); redis.hsetAsync(sessionKey,"status",status).then(function(res){ SessionRepo.updateSessionStatus(sessionId,status,function(err,sqlResult){ if(handler){ handler(err,sqlResult); return; } if(err){ logger.error("set session status to mysql is error !"); }else{ logger.info("set session status is success"); ModelUtil.emitOK(self.eventEmitter, []); } }); }); } /** * 针对专科医生和家医模式更新会话的名称 * @param sessionId * @param name */ updateSessionName(sessionId,name,handler){ let self = this; let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session,sessionId); redis.hsetAsync(sessionKey,"name",name).then(function(res){ SessionRepo.updateSessionName(sessionId,name,function(err,sqlResult){ if(handler){ handler(err,sqlResult); return; } if(err){ logger.error("set session name to mysql is error !"); ModelUtil.emitError(self.eventEmitter,"set session name to mysql is error !",err) }else{ logger.info("set session name is success"); ModelUtil.emitOK(self.eventEmitter, []); } }); }); } } // Expose class module.exports = Sessions;