123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- /**
- * 消息模型。
- */
- "use strict";
- let MessageRepo = require('../../repository/mysql/message.repo');
- let RedisModel = require('./../redis.model.js');
- let SessionRepo = require('../../repository/mysql/session.repo');
- let ParticipantRepo = require('../../repository/mysql/participant.repo');
- let RedisClient = require('../../repository/redis/redis.client.js');
- let ModelUtil = require('../../util/model.util');
- var ObjectUtil = require("../../util/object.util.js");
- let WechatClient = require("../client/wechat.client.js");
- let AppClient = require("../client/app.client.js");
- let mongoose = require('mongoose');
- let redis = RedisClient.redisClient().connection;
- let log = require('../../util/log.js');
- let configFile = require('../../include/commons').CONFIG_FILE;
- let config = require('../../resources/config/' + configFile);
- let logger = require('../../util/log');
- let pubSub = require("../redis/pubSub.js");
- const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
- class Messages extends RedisModel {
- constructor() {
- super();
- }
- /**
- * 根据topicId获取对应的议题的信息列表
- * @param topicId
- */
- getMessagesByTopicId(topicId) {
- }
- /**
- * 分页
- * 根据topicId获取对应的议题的成员信息
- * @param topicId
- * @param page
- * @param pagesize
- */
- getMessagesByTopicIdForPage(topicId, page, pagesize) {
- }
- /**
- * 获取消息MySQL
- * @param sessionId
- * @param page
- * @param pagesize
- * @param handler
- */
- getMessageFromMySQL(sessionId, page, pagesize, messageType, handler) {
- MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, handler);
- }
- /**
- * 获取消息MySQL
- * @param sessionId
- * @param page
- * @param pagesize
- * @param messageType
- */
- getMessageByType(sessionId, page, pagesize, messageType) {
- let self = this;
- page = parseInt(page);
- pagesize = parseInt(pagesize);
- MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, function (err, res) {
- if (err) {
- ModelUtil.emitError(self.eventEmitter, {message: "Get message by session and type failed: " + err});
- } else {
- ModelUtil.emitOK(self.eventEmitter, {data: res});
- }
- });
- }
- /**
- * 获取单条消息
- *
- * @param messageId
- */
- getMessageById(messageId) {
- }
- /**
- * 将消息保存至REDIS,并更新会话的最后状态,清理会话的过期消息。
- *
- * PS:更新此处的消息缓存代码,记得更新Users.login()
- *
- * @param messageId
- * @param sessionId
- * @param sessionType
- * @param message
- */
- saveMessageToRedis(sessionId, sessionType, messageId, message) {
- let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
- let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
- let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
- let msgJson = {
- id: message.id,
- sender_id: message.sender_id,
- sender_name: message.sender_name,
- timestamp: message.timestamp.getTime(),
- content_type: message.content_type,
- content: message.content,
- business_type:message.business_type||1
- };
- redis.multi()
- .hset(messageKey, messageId, JSON.stringify(msgJson)) // 保存消息
- .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) // 保存消息时间
- .execAsync()
- .then(function (res) {
- Messages.updateLastContent(sessionKey, sessionType, null, message);
- SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId);
- Messages.cleanOutRangeMessage(sessionId); // clean out range messages
- })
- .catch(function (ex) {
- log.error("Save message to redis failed: ", ex);
- });
- }
- /**
- *创建session发送session成功
- * @param messageId
- * @param sessionId
- * @param sessionType
- * @param message
- */
- saveMessageToRedisFromCreateSession(sessionId, messageId, message) {
- let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
- let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
- let msgJson = {
- id: message.id,
- sender_id: message.sender_id,
- sender_name: message.sender_name,
- timestamp: message.timestamp.getTime(),
- content_type: message.content_type,
- content: message.content,
- business_type:message.business_type||1
- };
- redis.multi()
- .hset(messageKey, messageId, JSON.stringify(msgJson)) // 保存消息
- .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) // 保存消息时间
- .execAsync()
- .then(function (res) {
- Messages.cleanOutRangeMessage(sessionId); // clean out range messages
- })
- .catch(function (ex) {
- log.error("Save message to redis failed: ", ex);
- });
- }
- /**
- * 内外网通信。
- * 消息内容 存系统表
- * @param message
- */
- sendMessage(message) {
- let self = this;
- let messages = new Messages();
- let messageId = mongoose.Types.ObjectId().toString();
- let sessionType =0;
- message.id = messageId;
- message.session_id = "system";
- messages.saveMessageToMysql("system", sessionType, messageId, message, function (err, res) {
- if (err) {
- ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
- } else {
- message.timestamp = message.timestamp.getTime();
- let targetUserId = message.sender_name;
- //告知医生新消息
- WechatClient.sendSocketMessageToDoctor(targetUserId,message);
- if(config.environment!='local'){//pc版不推送个推,通过redis的publish
- AppClient.sendNotification(targetUserId, message,sessionType,1);
- }
- //外网pcim通过socket推送
- WechatClient.sendPcImSocket(targetUserId,message,sessionType);
- //redis发布消息
- if(config.pubSubSwitch) {//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
- pubSub.publish(config.pubChannel,JSON.stringify(message));
- }
- ModelUtil.emitOK(self.eventEmitter, {status: 200});
- }
- });
- }
- /**
- * 保存Message 到mysql
- * @param message 消息对象
- * @param sessionType
- * @param messageId
- * @param sessionId
- * @param handler
- * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
- */
- saveMessageToMysql(sessionId, sessionType, messageId, message, handler) {
- MessageRepo.save(message, sessionType, messageId, sessionId, handler);
- }
- /**
- * 更新会话最后一条消息
- *
- * @param sessionKey rediskey
- * @param sessionType
- * @param name 议题名称
- * @param message
- * @param businessType
- * @returns {*}
- */
- static updateLastContent(sessionKey, sessionType, name, message) {
- redis.hmsetAsync(sessionKey,
- "create_date", message.timestamp.getTime(),
- "last_content", message.content,
- "last_content_type", message.content_type,
- "last_sender_id", message.sender_id,
- "last_sender_name", message.sender_name
- );
- /**
- * mysql操作分离
- */
- // let sessionid = sessionKey.replace("sessions:", "");
- // SessionRepo.updateSessionLastStatus(message.sender_id,
- // message.sender_name,
- // message.timestamp,
- // message.content,
- // message.content_type,
- // sessionid, function (err, res) {
- // if (err) logger.error("Update session last status failed: ", err);
- // });
- if (name != null) redis.hsetAsync(sessionKey, "name", name);
- if (sessionType != null) redis.hsetAsync(sessionKey, "type", sessionType);
- }
- /**
- * 清理会话中超出范围的消息。
- *
- * 目前只实现超出1000条数据清理的逻辑,7天前的消息清除逻辑暂不实现。
- *
- * @param sessionId
- */
- static cleanOutRangeMessage(sessionId) {
- let maxMessageCacheCount = config.sessionConfig.maxMessageCount;
- let messageById = this.makeRedisKey(REDIS_KEYS.Messages, sessionId);
- let messagesByTimestampKey = this.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
- redis.zcardAsync(messagesByTimestampKey).then(function (count) {
- if (count > maxMessageCacheCount) {
- redis.zrevrangeAsync(messagesByTimestampKey, maxMessageCacheCount, count - maxMessageCacheCount).then(function (idList) {
- redis.zremAsync(messagesByTimestampKey, idList).then(function (res) {
- redis.hdel(messageById, idList);
- })
- });
- }
- });
- }
- /**
- *
- * 清洗数据
- * @param message
- */
- dataMessage(sessionId) {
- let self = this;
- let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
- let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
- MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, null, function (err, messages) {
- if (err) {
- ModelUtil.emitError(self.eventEmitter, {status: -1, message: "没有找到对应sessionId的聊天记录----"+sessionId});
- return;
- }
- let multi = redis.multi();
- if(messages){
- messages.forEach(function (message) {
- let msgJson = {
- id: message.id,
- sender_id: message.sender_id,
- sender_name: message.sender_name,
- timestamp: ObjectUtil.timestampToLong(message.timestamp),
- content_type: message.content_type,
- content: message.content
- };
- multi = multi.hset(messagesKey, message.id, JSON.stringify(msgJson))
- .zadd(message_timestamp_key, ObjectUtil.timestampToLong(message.timestamp), message.id);
- });
- }
- multi.execAsync()
- .then(function (res) {
- ModelUtil.emitOK(self.eventEmitter, {status:200,message:"存入redis成功!"});
- })
- .catch(function (ex) {
- log.error("Login failed while caching messages: ", ex);
- ModelUtil.emitOK(self.eventEmitter, res);
- return;
- });
- });
- }
- cleanMessageLastFetchTime(sessionId,userId) {
- let self = this;
- ParticipantRepo.findLastFetchTime(sessionId,userId,function(err,res){
- if (err) {
- logger.error(err);
- return;
- } else {
- let last_fetch_time = new Date(res[0].last_fetch_time).getTime();
- last_fetch_time = last_fetch_time + 1;
- let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
- redis.zaddAsync(participantsKey, last_fetch_time, userId)
- .then(function (res) {
- ModelUtil.emitOK(self.eventEmitter, {status:200,message:"存入redis成功!"});
- })
- .catch(function (err) {
- logger.error("Update participant last fetch time failed: ", err);
- ModelUtil.emitOK(self.eventEmitter, res);
- return;
- });
- }
- });
- }
- }
- // Expose class
- module.exports = Messages;
|