participants.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. /**
  2. * 会话成员模型。
  3. */
  4. "use strict";
  5. let RedisModel = require('./../redis.model.js');
  6. let ModelUtil = require('../../util/model.util');
  7. let RedisClient = require('../../repository/redis/redis.client.js');
  8. let clientCache = require('../socket.io/client.cache').clientCache();
  9. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  10. let SessionRepo = require('../../repository/mysql/session.repo');
  11. let log = require('../../util/log.js');
  12. let redis = RedisClient.redisClient().connection;
  13. let Users = require('../user/users');
  14. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  15. const SOCKET_TYPES = require('../../include/commons').SOCKET_TYPES;
  16. class Participants extends RedisModel {
  17. constructor() {
  18. super();
  19. }
  20. /**
  21. * 获取会话的成员列表,直接从MySQL获取。
  22. *
  23. * @param sessionId
  24. */
  25. getParticipants(sessionId) {
  26. let self = this;
  27. ParticipantRepo.findAll(sessionId, function (err, participants) {
  28. if (err) {
  29. ModelUtil.emitError(self.eventEmitter, "Get session participants error", err);
  30. return;
  31. }
  32. ModelUtil.emitOK(self.eventEmitter, participants);
  33. });
  34. }
  35. /**
  36. * 获取所有成员的头像。
  37. *
  38. * @param sessionId
  39. */
  40. getParticipantsAvatar(sessionId) {
  41. let self = this;
  42. ParticipantRepo.findAllAvatars(sessionId, function (err, participantsAvatars) {
  43. if (err) {
  44. ModelUtil.emitError(self.eventEmitter, "Get session participant's avatars error", err);
  45. return;
  46. }
  47. ModelUtil.emitOK(self.eventEmitter, participantsAvatars);
  48. })
  49. }
  50. /**
  51. * 会话中是否存在指定成员
  52. *
  53. * @param sessionId
  54. * @param userId
  55. * @param handler
  56. */
  57. existsParticipant(sessionId, userId, handler) {
  58. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  59. redis.hgetAsync(participantsRoleKey, userId).then(function (res) {
  60. if (false) {
  61. // get from redis
  62. handler(null, true);
  63. } else {
  64. // get from mysql
  65. ParticipantRepo.existsParticipant(sessionId, userId, handler);
  66. }
  67. })
  68. }
  69. /**
  70. * 获取P2P成员所在会话
  71. *
  72. * @param users
  73. * @param handler
  74. */
  75. getMucSessionIdByParticipants(users, handler) {
  76. ParticipantRepo.findMucSessionIdByUser(users, handler);
  77. }
  78. /**
  79. * 将成员写入redis
  80. *
  81. * @param sessionId 会话ID
  82. * @param participantsArray 会话参与者集合
  83. * @param createDate 创建日期
  84. * @param handler 回调
  85. */
  86. static saveParticipantsToRedis(sessionId, participantsArray, createDate, handler) {
  87. // 构造会话,成员及成员角色zset, hash所需要的数据
  88. let userSessions = {};
  89. let sessionParticipants = [];
  90. let sessionParticipantsRoles = [];
  91. participantsArray.forEach(function (item) {
  92. let tokens = item.split(":");
  93. userSessions[RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, tokens[0])] = [createDate.getTime(), sessionId];
  94. sessionParticipants.push(createDate.getTime());
  95. sessionParticipants.push(tokens[0]);
  96. sessionParticipantsRoles.push(tokens[0], tokens[1]);
  97. });
  98. // 向会话成员、会话成员角色集合中添加数据
  99. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  100. let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  101. let multi = redis.multi()
  102. .zadd(sessionParticipantsKey, sessionParticipants)
  103. .hmset(sessionParticipantsRoleKey, sessionParticipantsRoles);
  104. // 更新用户参与的会话列表
  105. for (let key in userSessions) {
  106. multi = multi.zadd(key, userSessions[key]);
  107. }
  108. multi.execAsync()
  109. .then(function (res) {
  110. handler(true);
  111. })
  112. .catch(function (ex) {
  113. handler(false);
  114. log.error("Save participants to redis failed: ", ex);
  115. });
  116. }
  117. /**
  118. * mysql成员创建
  119. *
  120. * @param sessionId
  121. * @param users
  122. * @param handler
  123. */
  124. static saveParticipantsToMysql(sessionId, users, handler) {
  125. return ParticipantRepo.saveParticipantsToMysql(sessionId, users, handler);
  126. }
  127. static removeUserFromRedis(sessionId,userId,handler){
  128. let self = this;
  129. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  130. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  131. let participants_role_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  132. redis.multi()
  133. .zrem(participants_key, userId)
  134. .zrem(user_session_key, sessionId)
  135. .hdel(participants_role_key,userId)
  136. .execAsync()
  137. .then(function (res) {
  138. handler(null,true);return;
  139. }).catch(function(err){
  140. handler(err,false);return;
  141. })
  142. }
  143. /**
  144. * 移除成员
  145. * @param sessionId
  146. * @param userId
  147. */
  148. removeUser(sessionId, userId,handler) {
  149. let self = this;
  150. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  151. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  152. let participants_role_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  153. // 移除会话中的成员信息,用户的Session信息及MySQL中的记录
  154. redis.multi()
  155. .zrem(participants_key, userId)
  156. .zrem(user_session_key, sessionId)
  157. .hdel(participants_role_key,userId)
  158. .execAsync()
  159. .then(function (res) {
  160. self.deleteUserFromMysql(sessionId, userId);
  161. if(handler){
  162. handler(null,true);return;
  163. }
  164. ModelUtil.emitOK(self.eventEmitter, {status:200,message:"成员删除成功!"});
  165. })
  166. .catch(function (err) {
  167. if(handler){
  168. handler(err,false);return;
  169. }
  170. log.error("成员删除失败: ", err);
  171. ModelUtil.emitError(self.eventEmitter, {status:-1,message: "成员删除失败: " + err});
  172. });
  173. }
  174. /**
  175. * 更新用户在MUC模式中的状态
  176. * @param sessionId 会话ID
  177. * @param user 用户
  178. * @param role 变更状态
  179. */
  180. updateUser(sessionId, user, role) {
  181. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  182. redis.hsetAsync(participantsRoleKey, user, role)
  183. .then(function (res) {
  184. ParticipantRepo.updateParticipant(sessionId, user, role, function (err, res) {
  185. });
  186. })
  187. }
  188. /**
  189. * 添加讨论组成员
  190. * @param sessionId
  191. * @param user
  192. */
  193. addUser(sessionId, user,role,handler) {
  194. let self = this;
  195. if(!role)role = 0;
  196. let users = [];
  197. if(user.split(",").length==1){
  198. users = [user+":"+role];
  199. }else {
  200. //添加多个成员
  201. let participants = user.split(",");
  202. for (let j in participants) {
  203. users.push( participants[j] +":0");
  204. }
  205. }
  206. Participants.saveParticipantsToRedis(sessionId, users, new Date(), function (res) {
  207. if (res) {
  208. Participants.saveParticipantsToMysql(sessionId, users,function(err,res){
  209. if(err){
  210. if(handler){
  211. handler(err,false);return;
  212. }
  213. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员添加失败"});
  214. }else{
  215. if(handler){
  216. handler(null,true);return;
  217. }
  218. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员添加成功!"});
  219. }
  220. });
  221. } else {
  222. if(handler){
  223. handler(null,true);return;
  224. }
  225. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员添加失败"});
  226. }
  227. })
  228. }
  229. /**
  230. * user从数据库中删除
  231. * @param sessionId 会话
  232. * @param user 用户
  233. */
  234. deleteUserFromMysql(sessionId, user) {
  235. ParticipantRepo.deleteUserFromMysql(sessionId, user);
  236. }
  237. /**
  238. * 改变登陆者的在线状态
  239. * @param userid 居民 patient_userid, 医生 doctor_userid
  240. * @param status 0离线 1在线
  241. */
  242. changUserRedisLoginStatus(userid,clientType,status,sessionId){
  243. let participants = new Participants();
  244. log.info("changUserRedisLoginStatus,userid:"+userid)
  245. log.info("changUserRedisLoginStatus,userid:"+userid)
  246. log.info("changUserRedisLoginStatus,userid:"+userid)
  247. if(sessionId != "system"){
  248. this.emitSessionUsers(sessionId,userid,"online");
  249. }
  250. }
  251. /**
  252. * 通知会话的其他成员离线,上线消息
  253. * @param sessionId 会话
  254. * @param sender_id 发送人
  255. * @param status online,offline
  256. */
  257. emitSessionUsers(sessionId,sender_id,status) {
  258. log.info("emitSessionUsers:sessionId:"+sessionId);
  259. log.info("emitSessionUsers:sender_id:"+sender_id);
  260. log.info("emitSessionUsers:status:"+status);
  261. // 推送消息
  262. ParticipantRepo.findIds(sessionId, function (err, res) {
  263. if (err) {
  264. log.error("Push message from session: get participant's id list failed: ", err);
  265. return;
  266. } else {
  267. res.forEach(function (participant) {
  268. Users.isPatientId(participant.id, function (err, isPatient) {
  269. let message = {
  270. sender_id:sender_id,
  271. session_id:sessionId,
  272. status:status,
  273. is_online_emit:1
  274. };
  275. if (isPatient) {
  276. log.info("emitSessionUsers:isPatient"+participant.id);
  277. let patientClient = clientCache.findById(participant.id);
  278. if(patientClient){
  279. patientClient.socket.emit('message', message);
  280. }
  281. let pc_patientClient = clientCache.findById("pcpatient_"+participant.id);
  282. if(pc_patientClient){
  283. pc_patientClient.socket.emit('message', message);
  284. }
  285. } else {
  286. log.info("emitSessionUsers:isdoctor"+participant.id);
  287. let doctorClient = clientCache.findByIdAndType(participant.id,SOCKET_TYPES.DOCTOR);
  288. if(doctorClient){
  289. doctorClient.socket.emit('message', message);
  290. }
  291. let pc_doctorClient = clientCache.findByIdAndType("pc_"+participant.id,SOCKET_TYPES.PC_DOCTOR);
  292. if(pc_doctorClient){
  293. pc_doctorClient.socket.emit('message',message);
  294. }
  295. }
  296. });
  297. })
  298. }
  299. })
  300. }
  301. updateSessionUser(userId,oldUserId,sessionId){
  302. let self = this;
  303. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  304. SessionRepo.findOne(sessionId,function(err,res){
  305. if(err){
  306. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话查询失败!"});
  307. return;
  308. }
  309. if(res && res.length!=1){
  310. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "用户未创建咨询!"});
  311. return;
  312. }else{
  313. let session = res[0];
  314. if(oldUserId){
  315. redis.hgetAsync(participantsRoleKey,oldUserId).then(function(role){
  316. if(!role)role = 0;
  317. self.deleteUserFromMysql(session.id,userId,function(err,addResult){
  318. if(err){
  319. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员添加失败!"});
  320. return;
  321. }
  322. self.removeUser(session.id,oldUserId,function(err,res){
  323. if(err){
  324. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "会话成员移除失败!"});
  325. return;
  326. }else{
  327. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"});
  328. return;
  329. }
  330. })
  331. });
  332. })
  333. }else{
  334. self.addUser(session.id,userId,"0",function(err,addResult){
  335. if(!err){
  336. ModelUtil.emitOK(self.eventEmitter, {status:200,message: "成员变更成功!"});
  337. return;
  338. }else{
  339. ModelUtil.emitOK(self.eventEmitter, {status:-1,message: "成员变更失败!"});
  340. return;
  341. }
  342. });
  343. }
  344. }
  345. })
  346. }
  347. }
  348. // Expose class
  349. module.exports = Participants;