123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- /*
- * redis发布订阅
- *example:
- * let channel="ryan";
- redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
- redis.pubSub.subscribe(channel);
- redis.pubSub.publish(channel,"hello from chen");
- */
- "use strict";
- let configFile = require('../../include/commons').CONFIG_FILE;
- let config = require('../../resources/config/' + configFile);
- let RedisModel = require('./../redis.model');
- let RedisSubClient = require('./redisSubClient');
- let RedisPubClient = require('./redisPubClient');
- let RedisClient = require('../../repository/redis/redis.client.js');
- let redisPubConn = RedisPubClient.redisClient().connection;
- let redisSubConn = RedisSubClient.redisClient().connection;
- let redis = RedisClient.redisClient().connection;
- let Sessions = require('../../models/sessions/sessions');
- let WechatClient = require("../client/wechat.client.js");
- let WlyySDK = require("../../util/wlyy.sdk");
- let AppClient = require("../client/app.client.js");
- let Participants = require('../sessions/participants');
- let Messages = require('../messages/messages');
- const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
- let ObjectUtil = require("../../util/object.util.js");
- let logger = require('../../util/log.js');
- let SessionRepo = require('../../repository/mysql/session.repo');
- let ParticipantRepo = require('../../repository/mysql/participant.repo');
- const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
- const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
- class PubSub{
- constructor(){
- this.sub=redisSubConn;
- this.handlers=new Map();
- this.subAction=(channle,message)=>{
- let actions= this.handlers.get(channle)||new Set();
- for(let action of actions)
- {
- console.log("接收消息:"+message);
- message = JSON.parse(message);
- if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
- //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
- if (message.targetType=='patient') {
- if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
- WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
- }
- } else {
- if(message.sessionType=="1"){
- WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
- }
- //告知医生新消息
- WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
- if(config.environment!='local'){//pc版不推送个推
- WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
- let count = 0;
- res = JSON.parse(res)
- if (res.status == 200) {
- let data = res.data;
- count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
- }
- AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
- });
- }
- //外网pcim通过socket推送
- WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
- }
- }
- //action(message);
- }
- }
- this.alredyPublishs=[];
- this.subConnected=false;
- }
- publish(channel,message)
- {
- let action=()=>{
- let pub=redisPubConn;
- pub.publish(channel,message);
- console.log("发布消息:channel:"+channel+",message:"+message);
- };
- if(this.subConnected===false)
- {
- this.alredyPublishs.push(action);
- }
- else{
- action();
- }
- }
- registerHandlers(channel,action)
- {
- var actions=this.handlers.get(channel)||new Set();
- actions.add(action);
- this.handlers.set(channel,actions);
- }
- subscribe(channel)
- {
- let self=this;
- this.sub.subscribe(channel,function (err,reply) {
- if(err){
- log.error(err);
- }
- self.subConnected=true;
- for(let publish of self.alredyPublishs){
- publish();
- }
- console.log("订阅成功:"+reply);
- });
- this.sub.on("message", function (channel, message) {
- self.subAction(channel,message);
- });
- }
- tearDown()
- {
- this.sub.quit();
- }
- }
- // Expose class
- module.exports = new PubSub();
|