/** * 会话成员模型。 */ "use strict"; let RedisModel = require('./../redis.model.js'); let ModelUtil = require('../../util/model.util'); let RedisClient = require('../../repository/redis/redis.client.js'); let clientCache = require('../socket.io/client.cache').clientCache(); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let SessionRepo = require('../../repository/mysql/session.repo'); let log = require('../../util/log.js'); let redis = RedisClient.redisClient().connection; let Users = require('../user/users'); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; const SOCKET_TYPES = require('../../include/commons').SOCKET_TYPES; class Participants extends RedisModel { constructor() { super(); } /** * 获取会话的成员列表,直接从MySQL获取。 * * @param sessionId */ getParticipants(sessionId) { let self = this; ParticipantRepo.findAll(sessionId, function (err, participants) { if (err) { ModelUtil.emitError(self.eventEmitter, "Get session participants error", err); return; } ModelUtil.emitOK(self.eventEmitter, participants); }); } /** * 获取所有成员的头像。 * * @param sessionId */ getParticipantsAvatar(sessionId) { let self = this; ParticipantRepo.findAllAvatars(sessionId, function (err, participantsAvatars) { if (err) { ModelUtil.emitError(self.eventEmitter, "Get session participant's avatars error", err); return; } ModelUtil.emitOK(self.eventEmitter, participantsAvatars); }) } /** * 会话中是否存在指定成员 * * @param sessionId * @param userId * @param handler */ existsParticipant(sessionId, userId, handler) { let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); redis.hgetAsync(participantsRoleKey, userId).then(function (res) { if (false) { // get from redis handler(null, true); } else { // get from mysql ParticipantRepo.existsParticipant(sessionId, userId, handler); } }) } /** * 获取P2P成员所在会话 * * @param users * @param handler */ getMucSessionIdByParticipants(users, handler) { ParticipantRepo.findMucSessionIdByUser(users, handler); } /** * 将成员写入redis * * @param sessionId 会话ID * @param participantsArray 会话参与者集合 * @param createDate 创建日期 * @param handler 回调 */ static saveParticipantsToRedis(sessionId, participantsArray, createDate, handler) { // 构造会话,成员及成员角色zset, hash所需要的数据 let userSessions = {}; let sessionParticipants = []; let sessionParticipantsRoles = []; participantsArray.forEach(function (item) { let tokens = item.split(":"); userSessions[RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, tokens[0])] = [createDate.getTime(), sessionId]; sessionParticipants.push(createDate.getTime()); sessionParticipants.push(tokens[0]); sessionParticipantsRoles.push(tokens[0], tokens[1]); }); // 向会话成员、会话成员角色集合中添加数据 let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); let multi = redis.multi() .zadd(sessionParticipantsKey, sessionParticipants) .hmset(sessionParticipantsRoleKey, sessionParticipantsRoles); // 更新用户参与的会话列表 for (let key in userSessions) { multi = multi.zadd(key, userSessions[key]); } multi.execAsync() .then(function (res) { handler(true); }) .catch(function (ex) { handler(false); log.error("Save participants to redis failed: ", ex); }); } /** * mysql成员创建 * * @param sessionId * @param users * @param handler */ static saveParticipantsToMysql(sessionId, users, handler) { return ParticipantRepo.saveParticipantsToMysql(sessionId, users, handler); } static removeUserFromRedis(sessionId,userId,handler){ let self = this; let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId); let participants_role_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); redis.multi() .zrem(participants_key, userId) .zrem(user_session_key, sessionId) .hdel(participants_role_key,userId) .execAsync() .then(function (res) { handler(null,true);return; }).catch(function(err){ handler(err,false);return; }) } /** * 移除成员 * @param sessionId * @param userId */ removeUser(sessionId, userId,handler) { let self = this; let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId); let participants_role_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); // 移除会话中的成员信息,用户的Session信息及MySQL中的记录 redis.multi() .zrem(participants_key, userId) .zrem(user_session_key, sessionId) .hdel(participants_role_key,userId) .execAsync() .then(function (res) { self.deleteUserFromMysql(sessionId, userId); if(handler){ handler(null,true);return; } ModelUtil.emitOK(self.eventEmitter, {status:200,message:"成员删除成功!"}); }) .catch(function (err) { if(handler){ handler(err,false);return; } log.error("成员删除失败: ", err); ModelUtil.emitError(self.eventEmitter, {status:-1,message: "成员删除失败: " + err}); }); } /** * 更新用户在MUC模式中的状态 * @param sessionId 会话ID * @param user 用户 * @param role 变更状态 */ updateUser(sessionId, user, role) { let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); redis.hsetAsync(participantsRoleKey, user, role) .then(function (res) { ParticipantRepo.updateParticipant(sessionId, user, role, function (err, res) { }); }) } /** * 添加讨论组成员 * @param sessionId * @param user */ addUser(sessionId, user,role,handler) { let self = this; if(!role)role = 0; let users = []; if(user.split(",").length==1){ users = [user+":"+role]; }else { //添加多个成员 let participants = user.split(","); for (let j in participants) { users.push( participants[j] +":0"); } } Participants.saveParticipantsToRedis(sessionId, users, new Date(), function (res) { if (res) { Participants.saveParticipantsToMysql(sessionId, users,function(err,res){ if(err){ if(handler){ handler(err,false);return; } ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员添加失败"}); }else{ if(handler){ handler(null,true);return; } ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员添加成功!"}); } }); } else { if(handler){ handler(null,true);return; } ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员添加失败"}); } }) } /** * user从数据库中删除 * @param sessionId 会话 * @param user 用户 */ deleteUserFromMysql(sessionId, user) { ParticipantRepo.deleteUserFromMysql(sessionId, user); } /** * 改变登陆者的在线状态 * @param userid 居民 patient_userid, 医生 doctor_userid * @param status 0离线 1在线 */ changUserRedisLoginStatus(userid,clientType,status,sessionId){ let participants = new Participants(); log.info("changUserRedisLoginStatus,userid:"+userid) log.info("changUserRedisLoginStatus,userid:"+userid) log.info("changUserRedisLoginStatus,userid:"+userid) if(sessionId != "system"){ if(status==1){ this.emitSessionUsers(sessionId,userid,"online"); }else if(status==0) { this.emitSessionUsers(sessionId,userid,"offline"); } }else { if(status==0){ let arr = userid.split("_"); let userId = arr[arr.length-1] //修改用户状态 Users.isPatientId(userId, function (err, isPatient) { if (isPatient) { log.info("修改患者状态"+status+"======"+userId) Users.updatePatientStatus(userId,status); } else { log.info("修改医生状态"+status+"======"+userId) Users.updateDoctorStatus(userId,status); } }); }else { //修改用户状态 Users.isPatientId(userid, function (err, isPatient) { if (isPatient) { log.info("修改患者状态"+status+"======"+userid) Users.updatePatientStatus(userid,status); } else { log.info("修改医生状态"+status+"======"+userid) Users.updateDoctorStatus(userid,status); } }); } } } /** * 通知会话的其他成员离线,上线消息 * @param sessionId 会话 * @param sender_id 发送人 * @param status online,offline */ emitSessionUsers(sessionId,sender_id,status) { log.info("emitSessionUsers:sessionId:"+sessionId); log.info("emitSessionUsers:sender_id:"+sender_id); log.info("emitSessionUsers:status:"+status); // 推送消息 ParticipantRepo.findIds(sessionId, function (err, res) { if (err) { log.error("Push message from session: get participant's id list failed: ", err); return; } else { res.forEach(function (participant) { Users.isPatientId(participant.id, function (err, isPatient) { let message = { sender_id:sender_id, session_id:sessionId, status:status, is_online_emit:1 }; if (isPatient) { log.info("emitSessionUsers:isPatient"+participant.id); let patientClient = clientCache.findById(participant.id); if(patientClient){ patientClient.socket.emit('message', message); } let pc_patientClient = clientCache.findById("pcpatient_"+participant.id); if(pc_patientClient){ pc_patientClient.socket.emit('message', message); } /*Users.updatePatientStatus(participant.id,status,function(err,sqlResult){ if(handler){ handler(err,sqlResult); return; } if(err){ logger.error("set session status to mysql is error !"); }else{ logger.info("set session status is success"); ModelUtil.emitOK(self.eventEmitter, []); } });*/ } else { log.info("emitSessionUsers:isdoctor"+participant.id); let doctorClient = clientCache.findByIdAndType(participant.id,SOCKET_TYPES.DOCTOR); if(doctorClient){ doctorClient.socket.emit('message', message); } let pc_doctorClient = clientCache.findByIdAndType("pc_"+participant.id,SOCKET_TYPES.PC_DOCTOR); if(pc_doctorClient){ pc_doctorClient.socket.emit('message',message); } /*log.info("修改医生状态"+status+"======"+participant.id) Users.updateDoctorStatus(participant.id,status,function(err,sqlResult){ if(handler){ handler(err,sqlResult); return; } if(err){ logger.error("set session status to mysql is error !"); }else{ logger.info("set session status is success"); ModelUtil.emitOK(self.eventEmitter, []); } });*/ } }); }) } }) } updateSessionUser(userId,oldUserId,sessionId){ let self = this; let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); SessionRepo.findOne(sessionId,function(err,res){ if(err){ ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话查询失败!"}); return; } if(res && res.length!=1){ ModelUtil.emitOK(self.eventEmitter, {status:200,message: "用户未创建咨询!"}); return; }else{ let session = res[0]; if(oldUserId){ redis.hgetAsync(participantsRoleKey,oldUserId).then(function(role){ if(!role)role = 0; self.deleteUserFromMysql(session.id,userId,function(err,addResult){ if(err){ ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员添加失败!"}); return; } self.removeUser(session.id,oldUserId,function(err,res){ if(err){ ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员移除失败!"}); return; }else{ ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"}); return; } }) }); }) }else{ self.addUser(session.id,userId,"1",function(err,addResult){ if(!err){ ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"}); return; }else{ ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员变更失败!"}); return; } }); } } }) } /** * 新增活跃成员 * @param userId * @param oldUserId * @param sessionId */ updateSessionUser0(userId,oldUserId,sessionId){ let self = this; let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); SessionRepo.findOne(sessionId,function(err,res){ if(err){ ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话查询失败!"}); return; } if(res && res.length!=1){ ModelUtil.emitOK(self.eventEmitter, {status:200,message: "用户未创建咨询!"}); return; }else{ let session = res[0]; if(oldUserId){ redis.hgetAsync(participantsRoleKey,oldUserId).then(function(role){ if(!role)role = 0; self.deleteUserFromMysql(session.id,userId,function(err,addResult){ if(err){ ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员添加失败!"}); return; } self.removeUser(session.id,oldUserId,function(err,res){ if(err){ ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员移除失败!"}); return; }else{ ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"}); return; } }) }); }) }else{ self.addUser(session.id,userId,"0",function(err,addResult){ if(!err){ ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"}); return; }else{ ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员变更失败!"}); return; } }); } } }) } } // Expose class module.exports = Participants;