package com.yihu.wlyy.statistics.task; import com.yihu.wlyy.statistics.util.HttpClientUtil; import com.yihu.wlyy.statistics.util.ImUtill; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import org.apache.http.NameValuePair; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; public class PushMsgTask { public static String url=""; private static Logger logger = LoggerFactory.getLogger(PushMsgTask.class); // 最大容量为50的数组堵塞队列 private static LinkedBlockingQueue queue = new LinkedBlockingQueue(); private static PushMsgTask instance; private static Object lock = new Object(); public static PushMsgTask getInstance() { synchronized (lock) { if (instance == null) { instance = new PushMsgTask(); instance.run(); } } return instance; } /** * 添加一条推送消息 * @param receiver 接收人 * @param type 消息类型 * @param title 消息标题 * @param msg 消息内容 * @param data 消息数据 */ public void put(String receiver, String type, String title, String msg, String data) { try { JSONObject json = new JSONObject(); json.put("receiver", receiver); json.put("type", type); json.put("title", title); json.put("msg", msg); json.put("data", data); queue.put(json); } catch (Exception e) { logger.error("添加到消息队列失败!", e); e.printStackTrace(); } } public void put(JSONArray array) { if (array == null || array.size() == 0) { return; } for (int i = 0; i < array.size(); i++) { JSONObject json = array.getJSONObject(i); if (json == null) { continue; } try { queue.put(json); } catch (Exception e) { logger.error("批量添加到消息队列失败!", e); } } } private void run() { new Thread(new ConsumerTask()).start(); } // 消费者 class ConsumerTask implements Runnable { @Override public void run() { try { while (true) { // 如果queue为空,则当前线程会堵塞,直到有新数据加入 JSONObject json = queue.take(); logger.info("发送前:"+json); // 推送平台消息 String receiver = json.containsKey("receiver") ? json.getString("receiver") : ""; String type = json.containsKey("type") ? json.getString("type") : ""; String title = json.containsKey("title") ? json.getString("title") : ""; String msg = json.containsKey("msg") ? json.getString("msg") : ""; String data = json.containsKey("data") ? json.getString("data") : ""; boolean res = pushMessage(receiver, type, title, msg, data); if (res) { logger.info("消息推送成功!"); } else { logger.error("消息推送失败!"); } } } catch (Exception ex) { ex.printStackTrace(); } } } /** * 消息推送 * * @param receiver 消息接收人 * @param msgType 消息类型 * @param title 消息标题 * @param msg 消息内容 * @param data 消息数据 */ public static boolean pushMessage(String receiver, String msgType, String title, String msg, String data) { try { //List params = new ArrayList(); //params.add(new BasicNameValuePair("to", receiver)); //params.add(new BasicNameValuePair("content", msg)); //params.add(new BasicNameValuePair("contentType", msgType)); //params.add(new BasicNameValuePair("title", title)); //params.add(new BasicNameValuePair("summary", data)); //String response = HttpClientUtil.post(url, params, "UTF-8"); org.json.JSONObject participants = new org.json.JSONObject(); participants.put("system",0); participants.put(receiver,0); org.json.JSONObject sessionObj = ImUtill.createSession(url,participants,"0","系统消息",""); if(sessionObj.getInt("status")==-1){ throw new RuntimeException(sessionObj.getString("message")); } org.json.JSONObject session = sessionObj.getJSONObject("data"); ImUtill.sendImMsg(url,"system","系统",session.getString("id"),"1", msg,msgType); return true; } catch (Exception e) { logger.error("push message error:", e); } return false; } }