/** * 用户集合。管理Redis中的用户列表。 * * author: Sand * since: 12/13/2016 */ "use strict"; let RedisModel = require('../redis.model'); let Doctor = require('./doctor'); let Patient = require('./patient'); let ImDb = require('../../repository/mysql/db/im.db'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let DoctorRepo = require('../../repository/mysql/doctor.repo'); let PatientRepo = require('../../repository/mysql/patient.repo'); let AppStatusRepo = require('../../repository/mysql/app.status.repo'); let SessionRepo = require('../../repository/mysql/session.repo'); let TopicRepo = require('../../repository/mysql/topic.repo'); let MessageRepo = require('../../repository/mysql/message.repo'); let ModelUtil = require('../../util/model.util'); let RedisClient = require('../../repository/redis/redis.client'); let redisConn = RedisClient.redisClient().connection; let async = require('async'); let log = require('../../util/log'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; const PLATFORMS = require('../../include/commons').PLATFORM; class Users extends RedisModel { constructor() { super(); this._key = REDIS_KEYS.Users; } /** * 获取用户,直接从MYSQL获取,缓存是否有在不能确定。 * * @param userId * @param outCallback */ getUser(userId, outCallback) { let self = this; async.waterfall([ // determine user type function (callback) { self.isPatientId(userId, function (err, isPatient) { callback(null, isPatient); }); }, // get from mysql function (isPatientId) { let repoProto = isPatientId ? PatientRepo : DoctorRepo; repoProto.findOne(userId, function (err, res) { let user = isPatientId ? new Doctor() : new Patient(); if (res.length > 0) { user.name = res[0].name; user.sex = res[0].sex; user.birthdate = res[0].birthdate; user.avatar = res[0].avatar; } outCallback(null, user); }); } ]); } /** * 取得用户微信端状态。 * * @param userId * @param outCallback */ getWechatStatus(userId) { let self = this; redisConn.hgetallAsync(self.makeRedisKey(REDIS_KEYS.UserWechatStatus, userId)) .then(function (res) { if (res) { ModelUtil.emitData(self, res); } else { ModelUtil.emitDataNotFound(self, {"message": "User is offline, unable to get wechat status."}); } }); } /** * 获取客户端App状态。 * * @param userId * @param outCallback */ getAppStatus(userId, outCallback) { let self = this; async.waterfall([ // get from redis function (callback) { let userStatusKey = self.makeRedisKey(REDIS_KEYS.UserStatus, userId); redisConn.hgetallAsync(userStatusKey).then(function (res) { if (res === null) { callback(null); // get from mysql } else { outCallback(null, res); } }); }, // get from MySQL function () { AppStatusRepo.findOne(userId, function (err, res) { let userStatus = null; if (res.length > 0) { userStatus = {}; userStatus.platform = res[0].platform; userStatus.token = res[0].token; userStatus.client_id = res[0].client_id; userStatus.app_in_bg = res[0].app_in_bg; userStatus.last_login_time = res[0].last_login_time; } outCallback(null, userStatus); }); } ]); } /** * 更新客户端App状态。 * * @param userId * @param appInBg */ updateAppStatus(userId, appInBg) { let self = this; redisConn.hsetAsync(self.makeRedisKey(REDIS_KEYS.UserAppStatus, userId), 'app_in_bg', appInBg) .then(function (res) { if (res) { ModelUtil.emitData(self.eventEmitter, {}); } else { ModelUtil.emitDataNotFound(self.eventEmitter, {"message": "User is offline, unable to update app status."}); } }); } /** * 用户登录。 * * 用户登录时会加载与之相关的会话列表,会话消息,用户自身信息:App状态与微信状态。 * * @param userId * @param platform * @param token * @param clientId * * @return 用户token */ login(userId, platform, token, clientId) { let self = this; let loginFromApp = platform !== PLATFORMS.Wechat; let usersKey = REDIS_KEYS.Users; let userKey = self.makeRedisKey(REDIS_KEYS.User, userId); let userStatusKey = self.makeRedisKey(loginFromApp ? REDIS_KEYS.UserAppStatus : REDIS_KEYS.UserWechatStatus, userId); let lastLoginTime = new Date(); async.waterfall([ // get user info from mysql function (callback) { self.getUser(userId, function (err, userInfo) { if (userInfo === null) { ModelUtil.emitDataNotFound(self, 'User not exists.'); return; } callback(null, userInfo); }) }, // cache user info and app/wechat status function (userInfo, callback) { let multi = redisConn.multi() .zadd(usersKey, lastLoginTime.getMilliseconds(), userId) .hmset(userKey, 'avatar', userInfo.avatar ? userInfo.avatar : '', 'birthdate', userInfo.birthdate? userInfo.birthdate : '', 'name', userInfo.name, 'role', loginFromApp ? 'doctor' : 'patient'); if (loginFromApp) { // cache app status multi = multi.hmset(userStatusKey, 'platform', platform, 'app_in_bg', false, 'client_id', clientId, 'token', token, 'last_login_time', lastLoginTime); } else { // cache wechat status multi = multi.hmset(userKey, 'open_id', userInfo.open_id); } multi.execAsync().then(function (res) { callback(null); }); }, // cache sessions, participants, topics, messages function (callback) { SessionRepo.findAll(userId, function (err, sessions) { for (let session in sessions) { let sessionId = session.id; let name = session.name; let type = session.type; let createDate = session.create_date; (function (sessionId, userId) { // cache sessions redisConn.multi() .zadd(self.makeRedisKey(REDIS_KEYS.Sessions), lastLoginTime) // 会话的最后活动时间设置为此用户的登录时间 .zadd(self.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime) // 会话的最后活动时间设置为此用户的登录时间 .hmset(self.makeRedisKey(REDIS_KEYS.Session, sessionId, 'name', name, 'type', type, 'create_date', createDate)) .execAsync().then(function (res) { // cache participants let sessionParticipantsKey = self.makeRedisKey(REDIS_KEYS.Participants, sessionId); let sessionParticipantsRoleKey = self.makeRedisKey(REDIS_KEYS.ParticipantsRole, sessionId); ParticipantRepo.findParticipants(sessionId, function (err, participants) { for (let participant in participants) { let participantId = participant.participant_id; let participantRole = participant.participant_role; let score = new Date().getMilliseconds(); redisConn.multi() .zadd(sessionParticipantsKey, participantId, score) .hset(sessionParticipantsRoleKey, participantId, participantRole) .execAsync().then(function (res) { }); } }); // cache messages let messagesKey = self.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messagesByTimestampKey = self.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, function (err, messages) { for (let message in messages) { let id = message.id; let msgJson = { sessionId: message.session_id, senderId: message.sender_id, senderName: message.sender_name, contentType: message.content_type, content: message.content, timestamp: message.timestamp }; redisConn.multi() .hset(messagesKey, id, msgJson) .zadd(messagesByTimestampKey, id) .execAsync() .then(function (res) { }); } }); // cache topics for MUC let topicsKey = self.makeRedisKey(REDIS_KEYS.Topics, sessionId); TopicRepo.findAll(sessionId, function (err, topics) { for (let topic in topics) { let topicKey = self.makeRedisKey(REDIS_KEYS.Topic, topic.id); let topicId = topic.id; let name = topic.name; let createTime = topic.create_time; let endBy = topic.end_by; let endTime = topic.end_time; let startMessageId = topic.start_message_id; let endMessageId = topic.end_message_id; redisConn.multi() .zadd(topicsKey, topicId) .hmset(topicKey, 'name', name, 'session_id', sessionId, 'create_time', createTime, 'end_by', endBy, 'end_time', endTime, 'start_message_id', startMessageId, 'end_message_id', endMessageId) .execAsync().then(function (res) { }); } }); }); })(sessionId, userId); } }); ModelUtil.emitData(self.eventEmitter, {}); } ]); } logout(userId) { let self = this; async.waterfall([ function (callback) { self.isPatientId(userId, function (err, isPatient) { callback(null, isPatient) }); }, function (callback, isPatient) { let usersKey = REDIS_KEYS.Users; let userKey = self.makeRedisKey(REDIS_KEYS.User, userId); let userStatusKey = self.makeRedisKey(isPatient ? REDIS_KEYS.UserWechatStatus : REDIS_KEYS.UserAppStatus, userId); redisConn.multiAsync() .del(usersKey) .del(userKey) .del(userStatusKey) .execAsync().then(function (res) { }) }], function (err, res) { ModelUtil.emitData(self.eventEmitter, {}); } ); } /** * 用户ID是否属于患者。 * * @param userId * @param callback */ isPatientId(userId, callback) { async.waterfall([ function (callback) { var sql = "select case when count(*) > 0 then true else false end 'is_patient' from patients where id = ?"; ImDb.execQuery({ "sql": sql, "args": [userId], "handler": function (err, res) { if (err) callback(err, res); callback(null, res); } }); }, function (res, callback) { if (res.length === 0) return false; callback(null, res[0].is_patient); } ], function (err, res) { if (err) { log.error("User id check failed: ", err); callback(null, false); return; } callback(null, res !== 0); }); } } let Promises = require('bluebird'); Promises.promisifyAll(Users.prototype); module.exports = Users;