|
@ -0,0 +1,173 @@
|
|
|
package com.yihu.wlyy.statistics.task;
|
|
|
|
|
|
import net.sf.json.JSONArray;
|
|
|
import net.sf.json.JSONObject;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.io.PrintWriter;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.URL;
|
|
|
import java.net.URLEncoder;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
public class PushMsgTask {
|
|
|
|
|
|
private static Logger logger = LoggerFactory.getLogger(PushMsgTask.class);
|
|
|
// 最大容量为50的数组堵塞队列
|
|
|
private static LinkedBlockingQueue<JSONObject> queue = new LinkedBlockingQueue<JSONObject>();
|
|
|
|
|
|
private static PushMsgTask instance;
|
|
|
|
|
|
private static Object lock = new Object();
|
|
|
|
|
|
@Value("${systemConfig.msg_push_server}")
|
|
|
private String msgPushServer;
|
|
|
|
|
|
public static PushMsgTask getInstance() {
|
|
|
synchronized (lock) {
|
|
|
if (instance == null) {
|
|
|
instance = new PushMsgTask();
|
|
|
instance.run();
|
|
|
}
|
|
|
}
|
|
|
return instance;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 添加一条推送消息
|
|
|
* @param receiver 接收人
|
|
|
* @param type 消息类型
|
|
|
* @param title 消息标题
|
|
|
* @param msg 消息内容
|
|
|
* @param data 消息数据
|
|
|
*/
|
|
|
public void put(String receiver, String type, String title, String msg, String data) {
|
|
|
try {
|
|
|
JSONObject json = new JSONObject();
|
|
|
json.put("receiver", receiver);
|
|
|
json.put("type", type);
|
|
|
json.put("title", title);
|
|
|
json.put("msg", msg);
|
|
|
json.put("data", data);
|
|
|
queue.put(json);
|
|
|
} catch (Exception e) {
|
|
|
logger.error("添加到消息队列失败!", e);
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void put(JSONArray array) {
|
|
|
if (array == null || array.size() == 0) {
|
|
|
return;
|
|
|
}
|
|
|
for (int i = 0; i < array.size(); i++) {
|
|
|
JSONObject json = array.getJSONObject(i);
|
|
|
if (json == null) {
|
|
|
continue;
|
|
|
}
|
|
|
try {
|
|
|
queue.put(json);
|
|
|
} catch (Exception e) {
|
|
|
logger.error("批量添加到消息队列失败!", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void run() {
|
|
|
new Thread(new ConsumerTask()).start();
|
|
|
}
|
|
|
|
|
|
// 消费者
|
|
|
class ConsumerTask implements Runnable {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
|
while (true) {
|
|
|
// 如果queue为空,则当前线程会堵塞,直到有新数据加入
|
|
|
JSONObject json = queue.take();
|
|
|
// 推送平台消息
|
|
|
String receiver = json.has("receiver") ? json.getString("receiver") : "";
|
|
|
String type = json.has("type") ? json.getString("type") : "";
|
|
|
String title = json.has("title") ? json.getString("title") : "";
|
|
|
String msg = json.has("msg") ? json.getString("msg") : "";
|
|
|
String data = json.has("data") ? json.getString("data") : "";
|
|
|
boolean res = pushMessage(receiver, type, title, msg, data);
|
|
|
if (res) {
|
|
|
logger.info("消息推送成功!");
|
|
|
} else {
|
|
|
logger.error("消息推送失败!");
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception ex) {
|
|
|
ex.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 消息推送
|
|
|
*
|
|
|
* @param receiver 消息接收人
|
|
|
* @param msgType 消息类型
|
|
|
* @param title 消息标题
|
|
|
* @param msg 消息内容
|
|
|
* @param data 消息数据
|
|
|
*/
|
|
|
public static boolean pushMessage(String receiver, String msgType, String title, String msg, String data) {
|
|
|
PrintWriter out = null;
|
|
|
BufferedReader in = null;
|
|
|
HttpURLConnection conn = null;
|
|
|
try {
|
|
|
//String server = getInstance().systemConfig.getMsgPushServer();
|
|
|
String server = "http://127.0.0.1:3000/system/sendmsg.im";
|
|
|
String url = server + "?to_uid=" + receiver + "&content=" + URLEncoder.encode(msg, "UTF-8") + "&type=" + msgType + "&title=" + URLEncoder.encode(title, "UTF-8") + "&data=" + data;
|
|
|
URL realUrl = new URL(url);
|
|
|
|
|
|
// 打开和URL之间的连接
|
|
|
conn = (HttpURLConnection) realUrl.openConnection();
|
|
|
conn.setRequestMethod("GET");
|
|
|
conn.setDoOutput(true);
|
|
|
conn.setDoInput(true);
|
|
|
conn.setUseCaches(false);
|
|
|
conn.setRequestProperty("Content-Type", "application/json");
|
|
|
|
|
|
// 读取返回内容
|
|
|
StringBuffer buffer = new StringBuffer();
|
|
|
BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream(), "UTF-8"));
|
|
|
String temp;
|
|
|
while ((temp = br.readLine()) != null) {
|
|
|
buffer.append(temp);
|
|
|
buffer.append("\n");
|
|
|
}
|
|
|
System.out.println(buffer.toString());
|
|
|
JSONObject json = new JSONObject().fromObject(buffer.toString());
|
|
|
if (json.getInt("errno") == 0) {
|
|
|
// 成功
|
|
|
return true;
|
|
|
} else {
|
|
|
// 失败
|
|
|
return false;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
logger.error("push message error:", e);
|
|
|
} finally {
|
|
|
try {
|
|
|
if (out != null) {
|
|
|
out.close();
|
|
|
}
|
|
|
if (in != null) {
|
|
|
in.close();
|
|
|
}
|
|
|
} catch (IOException ex) {
|
|
|
ex.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
}
|