/** * 用户集合。管理Redis中的用户列表。 * * author: Sand * since: 12/13/2016 */ "use strict"; let RedisModel = require('../redis.model'); let Doctor = require('./doctor'); let Patient = require('./patient'); let ImDb = require('../../repository/mysql/db/im.db'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let DoctorRepo = require('../../repository/mysql/doctor.repo'); let PatientRepo = require('../../repository/mysql/patient.repo'); let AppStatusRepo = require('../../repository/mysql/app.status.repo'); let SessionRepo = require('../../repository/mysql/session.repo'); let TopicRepo = require('../../repository/mysql/topic.repo'); let MessageRepo = require('../../repository/mysql/message.repo'); let ModelUtil = require('../../util/model.util'); let ObjectUtil = require("../../util/object.util.js"); let RedisClient = require('../../repository/redis/redis.client'); let redisConn = RedisClient.redisClient().connection; let async = require('async'); let log = require('../../util/log'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; const PLATFORMS = require('../../include/commons').PLATFORM; class Users extends RedisModel { constructor() { super(); } /** * 获取用户,直接从MYSQL获取,缓存是否有在不能确定。 * * @param userId * @param outCallback */ getUserFromMySQL(userId, outCallback) { let self = this; async.waterfall([ // determine user type function (callback) { self.isPatientId(userId, function (err, isPatient) { callback(null, isPatient); }); }, // get from mysql function (isPatientId) { let repoProto = isPatientId ? PatientRepo : DoctorRepo; repoProto.findOne(userId, function (err, res) { let user = isPatientId ? new Doctor() : new Patient(); if (res.length > 0) { user.name = res[0].name; user.sex = res[0].sex; user.birthdate = res[0].birthdate; user.avatar = res[0].avatar; } outCallback(null, user); }); } ]); } /** * 取得用户微信端状态。 * * @param userId * @param outCallback */ getWechatStatus(userId) { let self = this; redisConn.hgetallAsync(RedisModel.makeRedisKey(REDIS_KEYS.UserWechatStatus, userId)) .then(function (res) { if (res) { ModelUtil.emitOK(self, res); } else { ModelUtil.emitDataNotFound(self, {"message": "User is offline, unable to get wechat status."}); } }); } /** * 获取客户端App状态。 * * @param userId * @param outCallback */ getAppStatus(userId, outCallback) { let self = this; async.waterfall([ // get from redis function (callback) { let userStatusKey = RedisModel.makeRedisKey(REDIS_KEYS.UserStatus, userId); redisConn.hgetallAsync(userStatusKey).then(function (res) { if (res === null) { callback(null); // get from mysql } else { outCallback(null, res); } }); }, // get from MySQL function () { AppStatusRepo.findOne(userId, function (err, res) { let userStatus = null; if (res.length > 0) { userStatus = {}; userStatus.platform = res[0].platform; userStatus.token = res[0].token; userStatus.client_id = res[0].client_id; userStatus.app_in_bg = res[0].app_in_bg; userStatus.last_login_time = res[0].last_login_time; } outCallback(null, userStatus); }); } ]); } /** * 更新客户端App状态。 * * @param userId * @param appInBg */ updateAppStatus(userId, appInBg) { let self = this; let userStatusKey = RedisModel.makeRedisKey(REDIS_KEYS.UserAppStatus, userId); redisConn.hgetAsync(userStatusKey, 'app_in_bg').then(function (res) { if (res !== null) { redisConn.hsetAsync(userStatusKey, 'app_in_bg', appInBg).then(function (res) { ModelUtil.emitOK(self.eventEmitter, {}); }); } else { ModelUtil.emitDataNotFound(self.eventEmitter, {"message": "User is offline, unable to update app status."}); } }); } /** * 用户登录,仅缓存用户客户端状态信息,不缓存用户基本信息。 * * 用户登录时会加载与之相关的会话列表,会话消息,用户自身信息:App状态与微信状态。 * * TODO: 如果用户已经登录,但因为异常退出重新登录,是否需要刷新状态信息。 * * @param userId * @param platform * @param token * @param clientId * * @return 用户token */ login(userId, platform, token, clientId) { let self = this; let loginFromApp = platform !== PLATFORMS.Wechat; let usersKey = REDIS_KEYS.Users; let userKey = RedisModel.makeRedisKey(REDIS_KEYS.User, userId); let userStatusKey = RedisModel.makeRedisKey(loginFromApp ? REDIS_KEYS.UserAppStatus : REDIS_KEYS.UserWechatStatus, userId); let lastLoginTime = new Date(); async.waterfall([ // get user info from mysql function (callback) { self.getUserFromMySQL(userId, function (err, userInfo) { if (userInfo === null) { ModelUtil.emitDataNotFound(self, 'User not exists.'); return; } callback(null, userInfo); }) }, // cache user app/wechat status function (userInfo, callback) { let multi = redisConn.multi() .zadd(usersKey, lastLoginTime.getTime(), userId); //.hmset(userKey, 'avatar', userInfo.avatar ? userInfo.avatar : '', 'birthdate', userInfo.birthdate ? userInfo.birthdate : '', // 'name', userInfo.name, 'role', loginFromApp ? 'doctor' : 'patient'); if (loginFromApp) { // cache app status multi = multi.hmset(userStatusKey, 'platform', platform, 'app_in_bg', false, 'client_id', clientId, 'token', token, 'last_login_time', lastLoginTime.getTime()); } else { // cache wechat status multi = multi.hmset(userKey, 'open_id', userInfo.open_id, 'last_login_time', lastLoginTime.getTime()); } multi.execAsync().then(function (res) { callback(null); }); }, // cache sessions, participants, topics, messages function (callback) { SessionRepo.findAll(userId, function (err, sessions) { if(err){ ModelUtil.emitError(self.eventEmitter, err.message); return; } sessions.forEach(function (session) { let redisSession = [ "id", session.id, "name", session.name, "type", session.type, "last_sender_id", session.last_sender_id == null ? "" : session.last_sender_id, "last_sender_name", session.last_sender_name == null ? "" : session.last_sender_name, "last_content_type", session.last_content_type == null ? "" : session.last_content_type, "last_content", session.last_content == null ? "" : session.last_content, "last_message_time", session.last_message_time == null ? "" : session.last_message_time, "createDate", ObjectUtil.timestampToLong(session.create_date), ]; (function (sessionId, userId) { // cache sessions redisConn.multi() .zadd(REDIS_KEYS.Sessions, lastLoginTime.getTime(), sessionId) // 会话的最后活动时间设置为此用户的登录时间 .zadd(RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId), lastLoginTime.getTime(), sessionId) // 会话的最后活动时间设置为此用户的登录时间 .hmset(RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId), redisSession) .execAsync() .then(function (res) { // cache participants let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); ParticipantRepo.findParticipants(sessionId, function (err, participants) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } participants.forEach(function (participant) { let participantId = participant.participant_id; let participantRole = participant.participant_role; let score = ObjectUtil.timestampToLong(participant.last_fetch_time); redisConn.multi() .zadd(sessionParticipantsKey, score, participantId) .hset(sessionParticipantsRoleKey, participantRole, participantId) .execAsync().then(function (res) { }); }); }); // cache messages let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, function (err, messages) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } messages.forEach(function (message) { let msgJson = { id: message.id, sender_id: message.sender_id, sender_name: message.sender_name, timestamp: ObjectUtil.timestampToLong(message.timestamp), content_type: message.content_type, content: message.content }; redisConn.multi() .hset(messagesKey, message.id, JSON.stringify(msgJson)) .zadd(messagesByTimestampKey, ObjectUtil.timestampToLong(message.timestamp), message.id) .execAsync() .then(function (res) { }); }); }); // cache topics for MUC let topicsKey = RedisModel.makeRedisKey(REDIS_KEYS.Topics, sessionId); TopicRepo.findAll(sessionId, function (err, topics) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } topics.forEach(function (topic) { let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topic.id); let topicId = topic.id; let name = topic.name; let createTime = ObjectUtil.timestampToLong(topic.create_time); let endBy = topic.end_by; let endTime = ObjectUtil.timestampToLong(topic.end_time); let startMessageId = topic.start_message_id; let endMessageId = topic.end_message_id; redisConn.multi() .zadd(topicsKey, topicId) .hmset(topicKey, 'name', name, 'session_id', sessionId, 'create_time', createTime, 'end_by', endBy, 'end_time', endTime, 'start_message_id', startMessageId, 'end_message_id', endMessageId) .execAsync().then(function (res) { }); }); }); }); })(session.id, userId); }); }); callback(null, null); } ], function (err, res) { ModelUtil.emitOK(self.eventEmitter, {}); }); } logout(userId) { let self = this; async.waterfall([ function (callback) { self.isPatientId(userId, function (err, isPatient) { callback(null, isPatient) }); }, function (isPatient, callback) { let usersKey = REDIS_KEYS.Users; let userStatusKey = RedisModel.makeRedisKey(isPatient ? REDIS_KEYS.UserWechatStatus : REDIS_KEYS.UserAppStatus, userId); redisConn.multi() .zrem(usersKey, userId) .del(userStatusKey) .execAsync() .then(function (res) { if (res.length > 0 && res[0] === 0) { ModelUtil.emitDataNotFound(self.eventEmitter, {message: "User not found."}); } else { ModelUtil.emitOK(self.eventEmitter, {}); } }); }], function (err, res) { } ); } /** * 用户ID是否属于患者。 * * @param userId * @param callback */ isPatientId(userId, callback) { async.waterfall([ function (callback) { var sql = "select case when count(*) > 0 then true else false end 'is_patient' from patients where id = ?"; ImDb.execQuery({ "sql": sql, "args": [userId], "handler": function (err, res) { if (err) callback(err, res); callback(null, res); } }); }, function (res, callback) { if (res.length === 0) return false; callback(null, res[0].is_patient); } ], function (err, res) { if (err) { log.error("User id check failed: ", err); callback(null, false); return; } callback(null, res !== 0); }); } } let Promises = require('bluebird'); Promises.promisifyAll(Users.prototype); module.exports = Users;