package com.yihu.wlyy.statistics.task; import com.yihu.wlyy.statistics.util.http.HttpResponse; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import org.apache.http.NameValuePair; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; public class PushMsgTask { public static String url=""; private static Logger logger = LoggerFactory.getLogger(PushMsgTask.class); // 最大容量为50的数组堵塞队列 private static LinkedBlockingQueue queue = new LinkedBlockingQueue(); private static PushMsgTask instance; private static Object lock = new Object(); public static PushMsgTask getInstance() { synchronized (lock) { if (instance == null) { instance = new PushMsgTask(); instance.run(); } } return instance; } /** * 添加一条推送消息 * @param receiver 接收人 * @param type 消息类型 * @param title 消息标题 * @param msg 消息内容 * @param data 消息数据 */ public void put(String receiver, String type, String title, String msg, String data) { try { JSONObject json = new JSONObject(); json.put("receiver", receiver); json.put("type", type); json.put("title", title); json.put("msg", msg); json.put("data", data); queue.put(json); } catch (Exception e) { logger.error("添加到消息队列失败!", e); e.printStackTrace(); } } public void put(JSONArray array) { if (array == null || array.size() == 0) { return; } for (int i = 0; i < array.size(); i++) { JSONObject json = array.getJSONObject(i); if (json == null) { continue; } try { queue.put(json); } catch (Exception e) { logger.error("批量添加到消息队列失败!", e); } } } private void run() { new Thread(new ConsumerTask()).start(); } // 消费者 class ConsumerTask implements Runnable { @Override public void run() { try { while (true) { // 如果queue为空,则当前线程会堵塞,直到有新数据加入 JSONObject json = queue.take(); // 推送平台消息 String receiver = json.has("receiver") ? json.getString("receiver") : ""; String type = json.has("type") ? json.getString("type") : ""; String title = json.has("title") ? json.getString("title") : ""; String msg = json.has("msg") ? json.getString("msg") : ""; String data = json.has("data") ? json.getString("data") : ""; boolean res = pushMessage(receiver, type, title, msg, data); if (res) { logger.info("消息推送成功!"); } else { logger.error("消息推送失败!"); } } } catch (Exception ex) { ex.printStackTrace(); } } } /** * 消息推送 * * @param receiver 消息接收人 * @param msgType 消息类型 * @param title 消息标题 * @param msg 消息内容 * @param data 消息数据 */ public static boolean pushMessage(String receiver, String msgType, String title, String msg, String data) { try { List params = new ArrayList(); params.add(new BasicNameValuePair("to", receiver)); params.add(new BasicNameValuePair("content", msg)); params.add(new BasicNameValuePair("contentType", msgType)); params.add(new BasicNameValuePair("title", title)); params.add(new BasicNameValuePair("summary", data)); HttpResponse response = post(url, params); if (response != null && response.getStatusCode() == 200) { JSONObject json = JSONObject.fromObject(response.getBody()); if (!"200".equals(json.optString("status"))) { throw new Exception(json.optString("msg")); } } else { throw new Exception("接口调用错误!"+response.getBody()); } return true; } catch (Exception e) { logger.error("push message error:", e); } return false; } private static HttpResponse post(String url, List params) { HttpResponse re = new HttpResponse(); CloseableHttpResponse response = null; CloseableHttpClient httpclient = HttpClients.createDefault(); //设置请求信息 try { RequestConfig requestConfig = RequestConfig.custom(). setAuthenticationEnabled(true).build(); HttpPost request = new HttpPost(url ); request.setEntity(new UrlEncodedFormEntity(params,"UTF-8")); request.setConfig(requestConfig); response = httpclient.execute(request); re.setStatusCode(response.getStatusLine().getStatusCode()); re.setBody(EntityUtils.toString(response.getEntity(), "UTF-8")); } catch (Exception e) { re.setStatusCode(201); re.setBody(e.getMessage()); } finally { close(httpclient, response); } return re; } private static void close(CloseableHttpClient httpClient, CloseableHttpResponse response) { try { if (httpClient != null) { httpClient.close(); } } catch (Exception e) { System.out.print("关闭httpClient失败"); } try { if (response != null) { response.close(); } } catch (Exception e) { System.out.print("关闭response失败"); } } }