Bladeren bron

代码修改

yeshijie 7 jaren geleden
bovenliggende
commit
8190b11795
4 gewijzigde bestanden met toevoegingen van 171 en 177 verwijderingen
  1. 3 3
      src/server/app.js
  2. 33 39
      src/server/models/client/app.client.js
  3. 131 131
      src/server/models/redis/pubSub.js
  4. 4 4
      src/server/models/sessions/sessions.js

+ 3 - 3
src/server/app.js

@ -23,7 +23,7 @@ let UrlInitializer = require('./endpoints/url.initializer');
let JobInitializer = require('./models/schedule/job.initializer');
//消息订阅
let pubSub = require('./models/redis/pubSub.js');
// let pubSub = require('./models/redis/pubSub.js');
// initialize express application
let app = express();
@ -117,5 +117,5 @@ if(!server.address()){
//JobInitializer.init();
pubSub.registerHandlers(config.subChannel,msg=> console.log(msg));
pubSub.subscribe(config.subChannel);
// pubSub.registerHandlers(config.subChannel,msg=> console.log(msg));
// pubSub.subscribe(config.subChannel);

+ 33 - 39
src/server/models/client/app.client.js

@ -158,45 +158,39 @@ class AppClient extends RedisModel {
                    ModelUtil.logError("Get user app status failed", err);
                    return;
                }
                var session_type = isNaN(Number(sessionType))?sessionType:Number(sessionType);
                if(session_type==SESSION_TYPES.SYSTEM||
                    session_type==SESSION_TYPES.MUC||
                    session_type==SESSION_TYPES.P2P||
                    session_type==SESSION_TYPES.GROUP||
                    session_type==SESSION_TYPES.DISCUSSION||
                    session_type==SESSION_TYPES.PRESCRIPTION){
                    //是否发送消息
                    MessageNoticeSettingRepo.findOne(targetId,'1',function (err,res) {
                        if(res&&res.length>0){
                            let master_switch = res[0].master_switch;
                            let im_switch = res[0].im_switch;
                            let family_topic_switch = res[0].family_topic_switch;
                            if(master_switch==0||im_switch==0){
                                ModelUtil.logError(targetId+"-关闭im消息,消息id", message.id);
                            }
                            // else if(session_type==SESSION_TYPES.MUC&&family_topic_switch==1){
                            //     SignFamilyRepo.isHealthDoctor(message.session_id,targetId,function (err,res) {
                            //         if(res&&res.length==0){
                            //             ModelUtil.logError("全科医生:"+targetId+"-关闭im消息,消息id", message.id);
                            //         }else{
                            //             AppClient.sendNotice(targetId, message, sessionType,badge,userStatus)
                            //         }
                            //     })
                            // }
                            else{
                                AppClient.sendNotice(targetId, message, sessionType,badge,userStatus)
                            }
                        }
                    });
                }else{
                    AppClient.sendNotice(targetId, message, sessionType,badge,userStatus)
                }
                // let pc_doctorClient = clientCache.findByIdAndType("pc_"+targetId,SOCKET_TYPES.PC_DOCTOR);
                // if(pc_doctorClient){
                //     log.warn("User's pc is online, user id: " + targetId + ", we cannot send getui.");
                //     return;
                AppClient.sendNotice(targetId, message, sessionType,badge,userStatus);
                // var session_type = isNaN(Number(sessionType))?sessionType:Number(sessionType);
                // if(session_type==SESSION_TYPES.SYSTEM||
                //     session_type==SESSION_TYPES.MUC||
                //     session_type==SESSION_TYPES.P2P||
                //     session_type==SESSION_TYPES.GROUP||
                //     session_type==SESSION_TYPES.DISCUSSION||
                //     session_type==SESSION_TYPES.PRESCRIPTION){
                //     //是否发送消息
                //     MessageNoticeSettingRepo.findOne(targetId,'1',function (err,res) {
                //         if(res&&res.length>0){
                //             let master_switch = res[0].master_switch;
                //             let im_switch = res[0].im_switch;
                //             let family_topic_switch = res[0].family_topic_switch;
                //             if(master_switch==0||im_switch==0){
                //                 ModelUtil.logError(targetId+"-关闭im消息,消息id", message.id);
                //             }
                //             // else if(session_type==SESSION_TYPES.MUC&&family_topic_switch==1){
                //             //     SignFamilyRepo.isHealthDoctor(message.session_id,targetId,function (err,res) {
                //             //         if(res&&res.length==0){
                //             //             ModelUtil.logError("全科医生:"+targetId+"-关闭im消息,消息id", message.id);
                //             //         }else{
                //             //             AppClient.sendNotice(targetId, message, sessionType,badge,userStatus)
                //             //         }
                //             //     })
                //             // }
                //             else{
                //                 AppClient.sendNotice(targetId, message, sessionType,badge,userStatus);
                //             }
                //         }
                //     });
                // }else{
                //     AppClient.sendNotice(targetId, message, sessionType,badge,userStatus);
                // }
            });

+ 131 - 131
src/server/models/redis/pubSub.js

@ -1,131 +1,131 @@
/*
 * redis发布订阅
 *example:
 * let channel="ryan";
 redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
 redis.pubSub.subscribe(channel);
 redis.pubSub.publish(channel,"hello from chen");
 */
"use strict";
let configFile = require('../../include/commons').CONFIG_FILE;
let config = require('../../resources/config/' + configFile);
let RedisModel = require('./../redis.model');
let RedisSubClient = require('./redisSubClient');
let RedisPubClient = require('./redisPubClient');
let RedisClient = require('../../repository/redis/redis.client.js');
let redisPubConn = RedisPubClient.redisClient().connection;
let redisSubConn = RedisSubClient.redisClient().connection;
let redis = RedisClient.redisClient().connection;
let Sessions = require('../../models/sessions/sessions');
let WechatClient = require("../client/wechat.client.js");
let WlyySDK = require("../../util/wlyy.sdk");
let AppClient = require("../client/app.client.js");
let Participants = require('../sessions/participants');
let Messages = require('../messages/messages');
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
let ObjectUtil = require("../../util/object.util.js");
let logger = require('../../util/log.js');
let SessionRepo = require('../../repository/mysql/session.repo');
let ParticipantRepo = require('../../repository/mysql/participant.repo');
const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
class PubSub{
    constructor(){
        this.sub=redisSubConn;
        this.handlers=new Map();
        this.subAction=(channle,message)=>{
            let actions= this.handlers.get(channle)||new Set();
            for(let action of actions)
            {
                console.log("接收消息:"+message);
                message = JSON.parse(message);
                if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
                    //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
                    if (message.targetType=='patient') {
                        if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
                            WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
                        }
                    } else {
                        if(message.sessionType=="1"){
                            WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
                        }
                        //告知医生新消息
                        WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
                        if(config.environment!='local'){//pc版不推送个推
                            WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
                                let count = 0;
                                res =  JSON.parse(res)
                                if (res.status == 200) {
                                    let data = res.data;
                                    count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
                                }
                                AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
                            });
                        }
                        //外网pcim通过socket推送
                        WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
                    }
                }
                //action(message);
            }
        }
        this.alredyPublishs=[];
        this.subConnected=false;
    }
    publish(channel,message)
    {
        let action=()=>{
            let pub=redisPubConn;
            pub.publish(channel,message);
            console.log("发布消息:channel:"+channel+",message:"+message);
        };
        if(this.subConnected===false)
        {
            this.alredyPublishs.push(action);
        }
        else{
            action();
        }
    }
    registerHandlers(channel,action)
    {
        var actions=this.handlers.get(channel)||new Set();
        actions.add(action);
        this.handlers.set(channel,actions);
    }
    subscribe(channel)
    {
        let self=this;
        this.sub.subscribe(channel,function (err,reply) {
            if(err){
                log.error(err);
            }
            self.subConnected=true;
            for(let publish of self.alredyPublishs){
                publish();
            }
            console.log("订阅成功:"+reply);
        });
        this.sub.on("message", function (channel, message) {
            self.subAction(channel,message);
        });
    }
    tearDown()
    {
        this.sub.quit();
    }
}
// Expose class
module.exports = new PubSub();
// /*
//  * redis发布订阅
//  *example:
//  * let channel="ryan";
//  redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
//  redis.pubSub.subscribe(channel);
//
//  redis.pubSub.publish(channel,"hello from chen");
//  */
// "use strict";
//
// let configFile = require('../../include/commons').CONFIG_FILE;
// let config = require('../../resources/config/' + configFile);
//
// let RedisModel = require('./../redis.model');
// let RedisSubClient = require('./redisSubClient');
// let RedisPubClient = require('./redisPubClient');
// let RedisClient = require('../../repository/redis/redis.client.js');
// let redisPubConn = RedisPubClient.redisClient().connection;
// let redisSubConn = RedisSubClient.redisClient().connection;
// let redis = RedisClient.redisClient().connection;
// let Sessions = require('../../models/sessions/sessions');
// let WechatClient = require("../client/wechat.client.js");
// let WlyySDK = require("../../util/wlyy.sdk");
// let AppClient = require("../client/app.client.js");
// let Participants = require('../sessions/participants');
// let Messages = require('../messages/messages');
// const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
// let ObjectUtil = require("../../util/object.util.js");
// let logger = require('../../util/log.js');
// let SessionRepo = require('../../repository/mysql/session.repo');
// let ParticipantRepo = require('../../repository/mysql/participant.repo');
//
// const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
// const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
//
// class PubSub{
//     constructor(){
//         this.sub=redisSubConn;
//         this.handlers=new Map();
//
//         this.subAction=(channle,message)=>{
//             let actions= this.handlers.get(channle)||new Set();
//             for(let action of actions)
//             {
//                 console.log("接收消息:"+message);
//                 message = JSON.parse(message);
//
//                 if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
//                     //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
//                     if (message.targetType=='patient') {
//                         if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
//                             WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
//                         }
//                     } else {
//                         if(message.sessionType=="1"){
//                             WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
//                         }
//                         //告知医生新消息
//                         WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
//                         if(config.environment!='local'){//pc版不推送个推
//                             WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
//                                 let count = 0;
//                                 res =  JSON.parse(res)
//                                 if (res.status == 200) {
//                                     let data = res.data;
//                                     count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
//                                 }
//
//                                 AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
//                             });
//                         }
//                         //外网pcim通过socket推送
//                         WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
//                     }
//                 }
//                 //action(message);
//             }
//         }
//         this.alredyPublishs=[];
//         this.subConnected=false;
//     }
//
//     publish(channel,message)
//     {
//         let action=()=>{
//             let pub=redisPubConn;
//             pub.publish(channel,message);
//             console.log("发布消息:channel:"+channel+",message:"+message);
//         };
//         if(this.subConnected===false)
//         {
//             this.alredyPublishs.push(action);
//         }
//         else{
//             action();
//         }
//     }
//     registerHandlers(channel,action)
//     {
//         var actions=this.handlers.get(channel)||new Set();
//         actions.add(action);
//         this.handlers.set(channel,actions);
//     }
//     subscribe(channel)
//     {
//         let self=this;
//         this.sub.subscribe(channel,function (err,reply) {
//             if(err){
//                 log.error(err);
//             }
//             self.subConnected=true;
//             for(let publish of self.alredyPublishs){
//                 publish();
//             }
//             console.log("订阅成功:"+reply);
//         });
//
//         this.sub.on("message", function (channel, message) {
//             self.subAction(channel,message);
//         });
//     }
//
//     tearDown()
//     {
//         this.sub.quit();
//     }
// }
//
// // Expose class
// module.exports = new PubSub();

+ 4 - 4
src/server/models/sessions/sessions.js

@ -26,7 +26,7 @@ let logger = require('../../util/log.js');
let mongoose = require('mongoose');
let async = require("async");
let log = require("../../util/log.js");
let pubSub = require("../redis/pubSub.js");
// let pubSub = require("../redis/pubSub.js");
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
@ -1643,9 +1643,9 @@ class Sessions extends RedisModel {
                message.targetType = 'doctor';
            }
            //redis发布消息
            if(config.pubSubSwitch) {//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
                pubSub.publish(config.pubChannel,JSON.stringify(message));
            }
            // if(config.pubSubSwitch) {//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
            //     pubSub.publish(config.pubChannel,JSON.stringify(message));
            // }
        });
    }