participants.js 19 KB


  1. /**
  2. * 会话成员模型。
  3. */
  4. "use strict";
  5. let RedisModel = require('./../redis.model.js');
  6. let ModelUtil = require('../../util/model.util');
  7. let RedisClient = require('../../repository/redis/redis.client.js');
  8. let clientCache = require('../socket.io/client.cache').clientCache();
  9. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  10. let SessionRepo = require('../../repository/mysql/session.repo');
  11. let log = require('../../util/log.js');
  12. let redis = RedisClient.redisClient().connection;
  13. let Users = require('../user/users');
  14. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  15. const SOCKET_TYPES = require('../../include/commons').SOCKET_TYPES;
  16. class Participants extends RedisModel {
  17. constructor() {
  18. super();
  19. }
  20. /**
  21. * 获取会话的成员列表,直接从MySQL获取。
  22. *
  23. * @param sessionId
  24. */
  25. getParticipants(sessionId) {
  26. let self = this;
  27. ParticipantRepo.findAll(sessionId, function (err, participants) {
  28. if (err) {
  29. ModelUtil.emitError(self.eventEmitter, "Get session participants error", err);
  30. return;
  31. }
  32. ModelUtil.emitOK(self.eventEmitter, participants);
  33. });
  34. }
  35. /**
  36. * 获取所有成员的头像。
  37. *
  38. * @param sessionId
  39. */
  40. getParticipantsAvatar(sessionId) {
  41. let self = this;
  42. ParticipantRepo.findAllAvatars(sessionId, function (err, participantsAvatars) {
  43. if (err) {
  44. ModelUtil.emitError(self.eventEmitter, "Get session participant's avatars error", err);
  45. return;
  46. }
  47. ModelUtil.emitOK(self.eventEmitter, participantsAvatars);
  48. })
  49. }
  50. /**
  51. * 会话中是否存在指定成员
  52. *
  53. * @param sessionId
  54. * @param userId
  55. * @param handler
  56. */
  57. existsParticipant(sessionId, userId, handler) {
  58. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  59. redis.hgetAsync(participantsRoleKey, userId).then(function (res) {
  60. if (false) {
  61. // get from redis
  62. handler(null, true);
  63. } else {
  64. // get from mysql
  65. ParticipantRepo.existsParticipant(sessionId, userId, handler);
  66. }
  67. })
  68. }
  69. /**
  70. * 获取P2P成员所在会话
  71. *
  72. * @param users
  73. * @param handler
  74. */
  75. getMucSessionIdByParticipants(users, handler) {
  76. ParticipantRepo.findMucSessionIdByUser(users, handler);
  77. }
  78. /**
  79. * 将成员写入redis
  80. *
  81. * @param sessionId 会话ID
  82. * @param participantsArray 会话参与者集合
  83. * @param createDate 创建日期
  84. * @param handler 回调
  85. */
  86. static saveParticipantsToRedis(sessionId, participantsArray, createDate, handler) {
  87. // 构造会话,成员及成员角色zset, hash所需要的数据
  88. let userSessions = {};
  89. let sessionParticipants = [];
  90. let sessionParticipantsRoles = [];
  91. participantsArray.forEach(function (item) {
  92. let tokens = item.split(":");
  93. userSessions[RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, tokens[0])] = [createDate.getTime(), sessionId];
  94. sessionParticipants.push(createDate.getTime());
  95. sessionParticipants.push(tokens[0]);
  96. sessionParticipantsRoles.push(tokens[0], tokens[1]);
  97. });
  98. // 向会话成员、会话成员角色集合中添加数据
  99. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  100. let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  101. let multi = redis.multi()
  102. .zadd(sessionParticipantsKey, sessionParticipants)
  103. .hmset(sessionParticipantsRoleKey, sessionParticipantsRoles);
  104. // 更新用户参与的会话列表
  105. for (let key in userSessions) {
  106. multi = multi.zadd(key, userSessions[key]);
  107. }
  108. multi.execAsync()
  109. .then(function (res) {
  110. handler(true);
  111. })
  112. .catch(function (ex) {
  113. handler(false);
  114. log.error("Save participants to redis failed: ", ex);
  115. });
  116. }
  117. /**
  118. * mysql成员创建
  119. *
  120. * @param sessionId
  121. * @param users
  122. * @param handler
  123. */
  124. static saveParticipantsToMysql(sessionId, users, handler) {
  125. return ParticipantRepo.saveParticipantsToMysql(sessionId, users, handler);
  126. }
  127. static removeUserFromRedis(sessionId,userId,handler){
  128. let self = this;
  129. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  130. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  131. let participants_role_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  132. redis.multi()
  133. .zrem(participants_key, userId)
  134. .zrem(user_session_key, sessionId)
  135. .hdel(participants_role_key,userId)
  136. .execAsync()
  137. .then(function (res) {
  138. handler(null,true);return;
  139. }).catch(function(err){
  140. handler(err,false);return;
  141. })
  142. }
  143. /**
  144. * 移除成员
  145. * @param sessionId
  146. * @param userId
  147. */
  148. removeUser(sessionId, userId,handler) {
  149. let self = this;
  150. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  151. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  152. let participants_role_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  153. // 移除会话中的成员信息,用户的Session信息及MySQL中的记录
  154. redis.multi()
  155. .zrem(participants_key, userId)
  156. .zrem(user_session_key, sessionId)
  157. .hdel(participants_role_key,userId)
  158. .execAsync()
  159. .then(function (res) {
  160. self.deleteUserFromMysql(sessionId, userId);
  161. if(handler){
  162. handler(null,true);return;
  163. }
  164. ModelUtil.emitOK(self.eventEmitter, {status:200,message:"成员删除成功!"});
  165. })
  166. .catch(function (err) {
  167. if(handler){
  168. handler(err,false);return;
  169. }
  170. log.error("成员删除失败: ", err);
  171. ModelUtil.emitError(self.eventEmitter, {status:-1,message: "成员删除失败: " + err});
  172. });
  173. }
  174. /**
  175. * 更新用户在MUC模式中的状态
  176. * @param sessionId 会话ID
  177. * @param user 用户
  178. * @param role 变更状态
  179. */
  180. updateUser(sessionId, user, role) {
  181. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  182. redis.hsetAsync(participantsRoleKey, user, role)
  183. .then(function (res) {
  184. ParticipantRepo.updateParticipant(sessionId, user, role, function (err, res) {
  185. });
  186. })
  187. }
  188. /**
  189. * 添加讨论组成员
  190. * @param sessionId
  191. * @param user
  192. */
  193. addUser(sessionId, user,role,handler) {
  194. let self = this;
  195. if(!role)role = 0;
  196. let users = [];
  197. if(user.split(",").length==1){
  198. users = [user+":"+role];
  199. }else {
  200. //添加多个成员
  201. let participants = user.split(",");
  202. for (let j in participants) {
  203. users.push( participants[j] +":0");
  204. }
  205. }
  206. Participants.saveParticipantsToRedis(sessionId, users, new Date(), function (res) {
  207. if (res) {
  208. Participants.saveParticipantsToMysql(sessionId, users,function(err,res){
  209. if(err){
  210. if(handler){
  211. handler(err,false);return;
  212. }
  213. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员添加失败"});
  214. }else{
  215. if(handler){
  216. handler(null,true);return;
  217. }
  218. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员添加成功!"});
  219. }
  220. });
  221. } else {
  222. if(handler){
  223. handler(null,true);return;
  224. }
  225. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员添加失败"});
  226. }
  227. })
  228. }
  229. /**
  230. * user从数据库中删除
  231. * @param sessionId 会话
  232. * @param user 用户
  233. */
  234. deleteUserFromMysql(sessionId, user) {
  235. ParticipantRepo.deleteUserFromMysql(sessionId, user);
  236. }
  237. /**
  238. * 改变登陆者的在线状态
  239. * @param userid 居民 patient_userid, 医生 doctor_userid
  240. * @param status 0离线 1在线
  241. */
  242. changUserRedisLoginStatus(userid,clientType,status,sessionId){
  243. let participants = new Participants();
  244. log.info("changUserRedisLoginStatus,userid:"+userid)
  245. log.info("changUserRedisLoginStatus,userid:"+userid)
  246. log.info("changUserRedisLoginStatus,userid:"+userid)
  247. if(sessionId != "system"){
  248. if(status==1){
  249. this.emitSessionUsers(sessionId,userid,"online");
  250. }else if(status==0) {
  251. this.emitSessionUsers(sessionId,userid,"offline");
  252. }
  253. }else {
  254. if(status==0){
  255. let arr = userid.split("_");
  256. let userId = arr[arr.length-1]
  257. //修改用户状态
  258. Users.isPatientId(userId, function (err, isPatient) {
  259. if (isPatient) {
  260. log.info("修改患者状态"+status+"======"+userId)
  261. Users.updatePatientStatus(userId,status);
  262. } else {
  263. log.info("修改医生状态"+status+"======"+userId)
  264. Users.updateDoctorStatus(userId,status);
  265. }
  266. });
  267. }else {
  268. //修改用户状态
  269. Users.isPatientId(userid, function (err, isPatient) {
  270. if (isPatient) {
  271. log.info("修改患者状态"+status+"======"+userid)
  272. Users.updatePatientStatus(userid,status);
  273. } else {
  274. log.info("修改医生状态"+status+"======"+userid)
  275. Users.updateDoctorStatus(userid,status);
  276. }
  277. });
  278. }
  279. }
  280. }
  281. /**
  282. * 通知会话的其他成员离线,上线消息
  283. * @param sessionId 会话
  284. * @param sender_id 发送人
  285. * @param status online,offline
  286. */
  287. emitSessionUsers(sessionId,sender_id,status) {
  288. log.info("emitSessionUsers:sessionId:"+sessionId);
  289. log.info("emitSessionUsers:sender_id:"+sender_id);
  290. log.info("emitSessionUsers:status:"+status);
  291. // 推送消息
  292. ParticipantRepo.findIds(sessionId, function (err, res) {
  293. if (err) {
  294. log.error("Push message from session: get participant's id list failed: ", err);
  295. return;
  296. } else {
  297. res.forEach(function (participant) {
  298. Users.isPatientId(participant.id, function (err, isPatient) {
  299. let message = {
  300. sender_id:sender_id,
  301. session_id:sessionId,
  302. status:status,
  303. is_online_emit:1
  304. };
  305. if (isPatient) {
  306. log.info("emitSessionUsers:isPatient"+participant.id);
  307. let patientClient = clientCache.findById(participant.id);
  308. if(patientClient){
  309. patientClient.socket.emit('message', message);
  310. }
  311. let pc_patientClient = clientCache.findById("pcpatient_"+participant.id);
  312. if(pc_patientClient){
  313. pc_patientClient.socket.emit('message', message);
  314. }
  315. /*Users.updatePatientStatus(participant.id,status,function(err,sqlResult){
  316. if(handler){
  317. handler(err,sqlResult);
  318. return;
  319. }
  320. if(err){
  321. logger.error("set session status to mysql is error !");
  322. }else{
  323. logger.info("set session status is success");
  324. ModelUtil.emitOK(self.eventEmitter, []);
  325. }
  326. });*/
  327. } else {
  328. log.info("emitSessionUsers:isdoctor"+participant.id);
  329. let doctorClient = clientCache.findByIdAndType(participant.id,SOCKET_TYPES.DOCTOR);
  330. if(doctorClient){
  331. doctorClient.socket.emit('message', message);
  332. }
  333. let pc_doctorClient = clientCache.findByIdAndType("pc_"+participant.id,SOCKET_TYPES.PC_DOCTOR);
  334. if(pc_doctorClient){
  335. pc_doctorClient.socket.emit('message',message);
  336. }
  337. /*log.info("修改医生状态"+status+"======"+participant.id)
  338. Users.updateDoctorStatus(participant.id,status,function(err,sqlResult){
  339. if(handler){
  340. handler(err,sqlResult);
  341. return;
  342. }
  343. if(err){
  344. logger.error("set session status to mysql is error !");
  345. }else{
  346. logger.info("set session status is success");
  347. ModelUtil.emitOK(self.eventEmitter, []);
  348. }
  349. });*/
  350. }
  351. });
  352. })
  353. }
  354. })
  355. }
  356. updateSessionUser(userId,oldUserId,sessionId){
  357. let self = this;
  358. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  359. SessionRepo.findOne(sessionId,function(err,res){
  360. if(err){
  361. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话查询失败!"});
  362. return;
  363. }
  364. if(res && res.length!=1){
  365. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "用户未创建咨询!"});
  366. return;
  367. }else{
  368. let session = res[0];
  369. if(oldUserId){
  370. redis.hgetAsync(participantsRoleKey,oldUserId).then(function(role){
  371. if(!role)role = 0;
  372. self.deleteUserFromMysql(session.id,userId,function(err,addResult){
  373. if(err){
  374. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员添加失败!"});
  375. return;
  376. }
  377. self.removeUser(session.id,oldUserId,function(err,res){
  378. if(err){
  379. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员移除失败!"});
  380. return;
  381. }else{
  382. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"});
  383. return;
  384. }
  385. })
  386. });
  387. })
  388. }else{
  389. self.addUser(session.id,userId,"1",function(err,addResult){
  390. if(!err){
  391. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"});
  392. return;
  393. }else{
  394. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员变更失败!"});
  395. return;
  396. }
  397. });
  398. }
  399. }
  400. })
  401. }
  402. /**
  403. * 新增活跃成员
  404. * @param userId
  405. * @param oldUserId
  406. * @param sessionId
  407. */
  408. updateSessionUser0(userId,oldUserId,sessionId){
  409. let self = this;
  410. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  411. SessionRepo.findOne(sessionId,function(err,res){
  412. if(err){
  413. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话查询失败!"});
  414. return;
  415. }
  416. if(res && res.length!=1){
  417. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "用户未创建咨询!"});
  418. return;
  419. }else{
  420. let session = res[0];
  421. if(oldUserId){
  422. redis.hgetAsync(participantsRoleKey,oldUserId).then(function(role){
  423. if(!role)role = 0;
  424. self.deleteUserFromMysql(session.id,userId,function(err,addResult){
  425. if(err){
  426. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员添加失败!"});
  427. return;
  428. }
  429. self.removeUser(session.id,oldUserId,function(err,res){
  430. if(err){
  431. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员移除失败!"});
  432. return;
  433. }else{
  434. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"});
  435. return;
  436. }
  437. })
  438. });
  439. })
  440. }else{
  441. self.addUser(session.id,userId,"0",function(err,addResult){
  442. if(!err){
  443. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"});
  444. return;
  445. }else{
  446. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员变更失败!"});
  447. return;
  448. }
  449. });
  450. }
  451. }
  452. })
  453. }
  454. }
  455. // Expose class
  456. module.exports = Participants;