/** * 用户集合。管理Redis中的用户列表。 * * author: Sand * since: 12/13/2016 */ "use strict"; let RedisClient = require('../../repository/redis/redis.client'); let RedisModel = require('../redis.model'); let ImDb = require('../../repository/mysql/db/im.db'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let DoctorRepo = require('../../repository/mysql/doctor.repo'); let PatientRepo = require('../../repository/mysql/patient.repo'); let SessionRepo = require('../../repository/mysql/session.repo'); let MessageRepo = require('../../repository/mysql/message.repo'); let TopicRepo = require('../../repository/mysql/topics.repo'); let ModelUtil = require('../../util/model.util'); let ObjectUtil = require("../../util/object.util.js"); let Patient = require('./patient'); let Doctor = require('./doctor'); let redisConn = RedisClient.redisClient().connection; let async = require('async'); let log = require('../../util/log'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; const PLATFORMS = require('../../include/commons').PLATFORM; const SESSION_TYPE = require('../../include/commons').SESSION_TYPES; class Users extends RedisModel { constructor() { super(); } /** * 获取用户,直接从MYSQL获取,缓存是否有在不能确定。 * * @param userId * @param outCallback */ getUserFromMySQL(userId, outCallback) { let self = this; async.waterfall([ // determine user type function (callback) { Users.isPatientId(userId, function (err, isPatient) { callback(null, isPatient); }); }, // get from mysql function (isPatientId) { let repoProto = isPatientId ? PatientRepo : DoctorRepo; repoProto.findOne(userId, function (err, res) { let user = isPatientId ? new Patient() : new Doctor(); if (res.length > 0) { user.name = res[0].name; user.sex = res[0].sex; user.birthdate = res[0].birthdate; user.avatar = res[0].avatar; if(res[0].openid) user.openid = res[0].openid; } outCallback(null, user); }); } ]); } /** * 用户登录,仅缓存用户客户端状态信息,不缓存用户基本信息。 * * 用户登录时会加载与之相关的会话列表,会话消息,用户自身信息:App状态与微信状态。 * * TODO: 如果用户已经登录,但因为异常退出重新登录,是否需要刷新状态信息。 * * @param userId * @param platform * @param deviceToken * @param clientId * * @return 用户token */ login(userId, platform, deviceToken, clientId) { let self = this; let loginFromApp = platform !== PLATFORMS.Wechat; let usersKey = REDIS_KEYS.Users; let userKey = RedisModel.makeRedisKey(REDIS_KEYS.User, userId); let userStatusKey = RedisModel.makeRedisKey(loginFromApp ? REDIS_KEYS.UserAppStatus : REDIS_KEYS.UserWechatStatus, userId); let lastLoginTime = new Date(); async.waterfall([ // get user info from mysql function (callback) { self.getUserFromMySQL(userId, function (err, userInfo) { if (!userInfo) { ModelUtil.emitDataNotFound(self, 'User not exists.'); return; } callback(null, userInfo); }) }, // cache user and app/wechat status function (userInfo, callback) { let multi = redisConn.multi() .zadd(usersKey, lastLoginTime.getTime(), userId); /*.hmset(userKey, 'avatar', userInfo.avatar ? userInfo.avatar : '', 'birthdate', userInfo.birthdate ? ObjectUtil.timestampToLong(userInfo.birthdate) : '', 'name', userInfo.name, 'role', loginFromApp ? 'doctor' : 'patient');*/ if (loginFromApp) { // cache app status multi = multi.hmset(userStatusKey, 'app_in_bg', 0, 'client_id', clientId, 'device_token', deviceToken, 'last_login_time', lastLoginTime.getTime(), 'platform', platform); } else { // cache wechat status multi = multi.hmset(userStatusKey, 'last_login_time', lastLoginTime.getTime(), 'openid', userInfo.openid, 'platform', platform); } multi.execAsync() .then(function (res) { callback(null); }) .catch(function (ex) { log.error("Login failed while cache user status: ", ex); }); }, // cache sessions, participants, topics, messages function (callback) { SessionRepo.findAll(userId, function (err, sessions) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } sessions.forEach(function (session) { redisConn.zscore(REDIS_KEYS.Sessions, session.id, function (err, res) { // 已经缓存过的会话不再缓存 if (res != null) return; (function (sessionId, userId) { var business_type = session.business_type; if(!session.business_type&&session.type==SESSION_TYPE.MUC){ business_type = 2 }else if(!session.business_type&&session.type!=SESSION_TYPE.MUC){ business_type = 1 } let redisSession = [ "id", session.id, "name", session.name, "type", session.type, "business_type", business_type, "last_sender_id", session.last_sender_id == null ? "" : session.last_sender_id, "last_sender_name", session.last_sender_name == null ? "" : session.last_sender_name, "last_content_type", session.last_content_type == null ? "" : session.last_content_type, "last_content", session.last_content == null ? "" : session.last_content, "last_message_time", session.last_message_time == null ? "" : session.last_message_time, "create_date", ObjectUtil.timestampToLong(session.create_date), ]; // cache sessions redisConn.multi() .zadd(REDIS_KEYS.Sessions, lastLoginTime.getTime(), sessionId) // 会话的最后活动时间设置为此用户的登录时间 .zadd(RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime.getTime(), sessionId) // 会话的最后活动时间设置为此用户的登录时间 .hmset(RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId), redisSession) .execAsync() .then(function (res) { // cache participants let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); ParticipantRepo.findAll(sessionId, function (err, participants) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } let multi = redisConn.multi(); participants.forEach(function (participant) { let participantId = participant.id; let participantRole = participant.role; let score = ObjectUtil.timestampToLong(participant.last_fetch_time); multi = multi.zadd(sessionParticipantsKey, score, participantId) .hset(sessionParticipantsRoleKey, participantId, participantRole); }); multi.execAsync() .then(function (res) { }) .catch(function (ex) { log.error("Login failed while caching participants: ", ex); }); }); // cache messages let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, null, function (err, messages) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } let multi = redisConn.multi(); messages.forEach(function (message) { let msgJson = { id: message.id, sender_id: message.sender_id, sender_name: message.sender_name, timestamp: ObjectUtil.timestampToLong(message.timestamp), content_type: message.content_type, content: message.content }; multi = multi.hset(messagesKey, message.id, JSON.stringify(msgJson)) .zadd(messagesByTimestampKey, ObjectUtil.timestampToLong(message.timestamp), message.id); }); multi.execAsync() .then(function (res) { }) .catch(function (ex) { log.error("Login failed while caching messages: ", ex); }); }); // cache topics for MUC let topicsKey = RedisModel.makeRedisKey(REDIS_KEYS.Topics, sessionId); TopicRepo.findAllBySessionId(sessionId, function (err, topics) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } topics.forEach(function (topic) { let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topic.id); let topicId = topic.id; let name = topic.name == null ? "" : topic.name; let createTime = ObjectUtil.timestampToLong(topic.create_time); let endBy = topic.end_by == null ? "" : topic.end_by; let endTime = topic.end_time == null ? 0 : ObjectUtil.timestampToLong(topic.end_time); let startMessageId = topic.start_message_id == null ? "" : topic.start_message_id; let endMessageId = topic.end_message_id == null ? "" : topic.end_message_id; let description = topic.description == null ? "" : topic.description; let status = topic.status == null ? 0 : topic.status; redisConn.multi() .zadd(topicsKey, createTime, topicId) .hmset(topicKey, 'name', name, 'session_id', sessionId, 'create_time', createTime, 'end_by', endBy, 'end_time', endTime, 'start_message_id', startMessageId, 'end_message_id', endMessageId, 'description', description, 'status', status) .execAsync() .catch(function (ex) { log.error("Login failed while caching topics: ", ex); }); }); }); }) .catch(function (ex) { log.error("Login failed while caching sessions: ", ex); }); })(session.id, userId); }); }); }); callback(null, null); } ], function (err, res) { ModelUtil.emitOK(self.eventEmitter, {}); }); } logout(userId) { let self = this; async.waterfall([ function (callback) { Users.isPatientId(userId, function (err, isPatient) { callback(null, isPatient) }); }, function (isPatient, callback) { let usersKey = REDIS_KEYS.Users; let userStatusKey = RedisModel.makeRedisKey(isPatient ? REDIS_KEYS.UserWechatStatus : REDIS_KEYS.UserAppStatus, userId); redisConn.multi() .zrem(usersKey, userId) .del(userStatusKey) .execAsync() .then(function (res) { if (res.length > 0 && res[0] === 0) { ModelUtil.emitDataNotFound(self.eventEmitter, {message: "User not found."}); } else { ModelUtil.emitOK(self.eventEmitter, {}); } }) .catch(function (ex) { log.error("Logout failed: ", ex); }); }], function (err, res) { } ); } /** * 用户ID是否属于患者。 * * @param userId * @param callback */ static isPatientId(userId, callback) { async.waterfall([ function (callback) { var sql = "select case when count(*) > 0 then true else false end 'is_patient' from patients where id = ?"; ImDb.execQuery({ "sql": sql, "args": [userId], "handler": function (err, res) { if (err) callback(err, res); callback(null, res); } }); }, function (res, callback) { if (res.length === 0) return false; callback(null, res[0].is_patient); } ], function (err, res) { if (err) { log.error("User id check failed: ", err); callback(null, false); return; } callback(null, res !== 0); }); } } let Promises = require('bluebird'); Promises.promisifyAll(Users.prototype); module.exports = Users;