|
@ -45,150 +45,40 @@ class PubSub{
|
|
|
{
|
|
|
console.log("接收消息:"+message);
|
|
|
message = JSON.parse(message);
|
|
|
message.timestamp = new Date(message.timestamp);
|
|
|
let sessionId = message.session_id;
|
|
|
let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
|
|
|
|
|
|
redis.hexistsAsync(sessionKey, sessionId).then(function(res){
|
|
|
if(res==0){
|
|
|
////新增session时 要先把session缓存到redis
|
|
|
SessionRepo.findOne(sessionId, function (err, res) {
|
|
|
if (res.length > 0) {//已经存在
|
|
|
let type = res[0].type;
|
|
|
let createDate = res[0].create_date;
|
|
|
let session = {
|
|
|
id: sessionId,
|
|
|
name: res[0].name,
|
|
|
type: type,
|
|
|
create_date: createDate.getTime(),
|
|
|
business_type: res[0].business_type
|
|
|
};
|
|
|
redis.hmsetAsync(sessionKey, session).then(function () {
|
|
|
ParticipantRepo.findParricipantBySessionId(sessionId,function (err,res) {
|
|
|
// 构造会话,成员及成员角色zset, hash所需要的数据
|
|
|
let userSessions = {};
|
|
|
let sessionParticipants = [];
|
|
|
let sessionParticipantsRoles = [];
|
|
|
res.forEach(function (item) {
|
|
|
let participant_id = item.participant_id;
|
|
|
let participant_role = item.participant_role;
|
|
|
userSessions[RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, participant_id)] = [createDate.getTime(), sessionId];
|
|
|
sessionParticipants.push(createDate.getTime());
|
|
|
sessionParticipants.push(participant_id);
|
|
|
sessionParticipantsRoles.push(participant_id, participant_role);
|
|
|
});
|
|
|
// 向会话成员、会话成员角色集合中添加数据
|
|
|
let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
|
let sessionParticipantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
|
|
|
let multi = redis.multi()
|
|
|
.zadd(sessionParticipantsKey, sessionParticipants)
|
|
|
.hmset(sessionParticipantsRoleKey, sessionParticipantsRoles);
|
|
|
if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
|
|
|
//Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
|
|
|
if (message.targetType=='patient') {
|
|
|
if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
|
|
|
WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
|
|
|
}
|
|
|
} else {
|
|
|
if(message.sessionType=="1"){
|
|
|
WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
|
|
|
}
|
|
|
//告知医生新消息
|
|
|
WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
|
|
|
if(config.environment!='local'){//pc版不推送个推
|
|
|
WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
|
|
|
let count = 0;
|
|
|
res = JSON.parse(res)
|
|
|
if (res.status == 200) {
|
|
|
let data = res.data;
|
|
|
count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
|
|
|
}
|
|
|
|
|
|
// 更新用户参与的会话列表
|
|
|
for (let key in userSessions) {
|
|
|
multi = multi.zadd(key, userSessions[key]);
|
|
|
}
|
|
|
multi.execAsync().then(function (res) {
|
|
|
sendMessage(sessionId,message,sessionKey);
|
|
|
}).catch(function (ex) {
|
|
|
logger.error("Save participants to redis failed: ", ex);
|
|
|
});
|
|
|
});
|
|
|
})
|
|
|
}
|
|
|
});
|
|
|
}else {
|
|
|
sendMessage(sessionId,message,sessionKey);
|
|
|
AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
|
|
|
});
|
|
|
}
|
|
|
//外网pcim通过socket推送
|
|
|
WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
|
|
|
}
|
|
|
})
|
|
|
}
|
|
|
//action(message);
|
|
|
}
|
|
|
}
|
|
|
this.alredyPublishs=[];
|
|
|
this.subConnected=false;
|
|
|
|
|
|
function sendMessage(sessionId,message,sessionKey) {
|
|
|
let participants = new Participants();
|
|
|
// 检查会话中是否存在此成员
|
|
|
participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
|
|
|
if (res) {
|
|
|
redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
|
|
|
let sessionType = res[0];
|
|
|
let sessionName = res[1];
|
|
|
if (sessionType) {
|
|
|
// 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
|
|
|
let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
|
|
|
let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
|
|
|
let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
|
|
|
|
let msgJson = {
|
|
|
id: message.id,
|
|
|
sender_id: message.sender_id,
|
|
|
sender_name: message.sender_name,
|
|
|
timestamp: message.timestamp.getTime(),
|
|
|
content_type: message.content_type,
|
|
|
content: message.content,
|
|
|
business_type:message.business_type||1
|
|
|
};
|
|
|
|
|
|
redis.multi()
|
|
|
.hset(messageKey, message.id, JSON.stringify(msgJson)) // 保存消息
|
|
|
.zadd(messageTimestampKey, message.timestamp.getTime(), message.id) // 保存消息时间
|
|
|
.execAsync()
|
|
|
.then(function (res) {
|
|
|
Messages.updateLastContent(sessionKey, sessionType, null, message);
|
|
|
Messages.cleanOutRangeMessage(sessionId); // clean out range messages
|
|
|
})
|
|
|
.catch(function (ex) {
|
|
|
logger.error("Save message to redis failed: ", ex);
|
|
|
});
|
|
|
Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
|
|
|
var score = message.timestamp.getTime() + 1;
|
|
|
let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
|
redis.zaddAsync(participantsKey, score, message.sender_id)
|
|
|
.then(function (res) {
|
|
|
//这个代码需要消息存入缓存后才能执行,否则用户拉取消息列表时可能缺少消息
|
|
|
if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
|
|
|
//Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
|
|
|
if (message.targetType=='patient') {
|
|
|
if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
|
|
|
WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
|
|
|
}
|
|
|
} else {
|
|
|
if(message.sessionType=="1"){
|
|
|
WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
|
|
|
}
|
|
|
//告知医生新消息
|
|
|
WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
|
|
|
if(config.environment!='local'){//pc版不推送个推
|
|
|
WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
|
|
|
let count = 0;
|
|
|
res = JSON.parse(res)
|
|
|
if (res.status == 200) {
|
|
|
let data = res.data;
|
|
|
count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
|
|
|
}
|
|
|
|
|
|
AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
|
|
|
});
|
|
|
}
|
|
|
//外网pcim通过socket推送
|
|
|
WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
|
|
|
}
|
|
|
}
|
|
|
})
|
|
|
.catch(function (err) {
|
|
|
logger.error("Update participant last fetch time failed: ", err);
|
|
|
});
|
|
|
}
|
|
|
}).catch(function (err) {
|
|
|
logger.error({message: "Error occurred while save message to session: " + err});
|
|
|
})
|
|
|
} else {
|
|
|
logger.error({message: "当前会话找不到此发送者"});
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
publish(channel,message)
|