PushMsgTask.java 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package com.yihu.wlyy.statistics.task;
  2. import com.yihu.wlyy.statistics.util.http.HttpResponse;
  3. import net.sf.json.JSONArray;
  4. import net.sf.json.JSONObject;
  5. import org.apache.http.NameValuePair;
  6. import org.apache.http.client.config.RequestConfig;
  7. import org.apache.http.client.entity.UrlEncodedFormEntity;
  8. import org.apache.http.client.methods.CloseableHttpResponse;
  9. import org.apache.http.client.methods.HttpPost;
  10. import org.apache.http.impl.client.CloseableHttpClient;
  11. import org.apache.http.impl.client.HttpClients;
  12. import org.apache.http.message.BasicNameValuePair;
  13. import org.apache.http.util.EntityUtils;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import java.util.ArrayList;
  17. import java.util.List;
  18. import java.util.concurrent.LinkedBlockingQueue;
  19. public class PushMsgTask {
  20. public static String url="";
  21. private static Logger logger = LoggerFactory.getLogger(PushMsgTask.class);
  22. // 最大容量为50的数组堵塞队列
  23. private static LinkedBlockingQueue<JSONObject> queue = new LinkedBlockingQueue<JSONObject>();
  24. private static PushMsgTask instance;
  25. private static Object lock = new Object();
  26. public static PushMsgTask getInstance() {
  27. synchronized (lock) {
  28. if (instance == null) {
  29. instance = new PushMsgTask();
  30. instance.run();
  31. }
  32. }
  33. return instance;
  34. }
  35. /**
  36. * 添加一条推送消息
  37. * @param receiver 接收人
  38. * @param type 消息类型
  39. * @param title 消息标题
  40. * @param msg 消息内容
  41. * @param data 消息数据
  42. */
  43. public void put(String receiver, String type, String title, String msg, String data) {
  44. try {
  45. JSONObject json = new JSONObject();
  46. json.put("receiver", receiver);
  47. json.put("type", type);
  48. json.put("title", title);
  49. json.put("msg", msg);
  50. json.put("data", data);
  51. queue.put(json);
  52. } catch (Exception e) {
  53. logger.error("添加到消息队列失败!", e);
  54. e.printStackTrace();
  55. }
  56. }
  57. public void put(JSONArray array) {
  58. if (array == null || array.size() == 0) {
  59. return;
  60. }
  61. for (int i = 0; i < array.size(); i++) {
  62. JSONObject json = array.getJSONObject(i);
  63. if (json == null) {
  64. continue;
  65. }
  66. try {
  67. queue.put(json);
  68. } catch (Exception e) {
  69. logger.error("批量添加到消息队列失败!", e);
  70. }
  71. }
  72. }
  73. private void run() {
  74. new Thread(new ConsumerTask()).start();
  75. }
  76. // 消费者
  77. class ConsumerTask implements Runnable {
  78. @Override
  79. public void run() {
  80. try {
  81. while (true) {
  82. // 如果queue为空,则当前线程会堵塞,直到有新数据加入
  83. JSONObject json = queue.take();
  84. // 推送平台消息
  85. String receiver = json.has("receiver") ? json.getString("receiver") : "";
  86. String type = json.has("type") ? json.getString("type") : "";
  87. String title = json.has("title") ? json.getString("title") : "";
  88. String msg = json.has("msg") ? json.getString("msg") : "";
  89. String data = json.has("data") ? json.getString("data") : "";
  90. boolean res = pushMessage(receiver, type, title, msg, data);
  91. if (res) {
  92. logger.info("消息推送成功!");
  93. } else {
  94. logger.error("消息推送失败!");
  95. }
  96. }
  97. } catch (Exception ex) {
  98. ex.printStackTrace();
  99. }
  100. }
  101. }
  102. /**
  103. * 消息推送
  104. *
  105. * @param receiver 消息接收人
  106. * @param msgType 消息类型
  107. * @param title 消息标题
  108. * @param msg 消息内容
  109. * @param data 消息数据
  110. */
  111. public static boolean pushMessage(String receiver, String msgType, String title, String msg, String data) {
  112. try {
  113. List<NameValuePair> params = new ArrayList<NameValuePair>();
  114. params.add(new BasicNameValuePair("to", receiver));
  115. params.add(new BasicNameValuePair("content", msg));
  116. params.add(new BasicNameValuePair("contentType", msgType));
  117. params.add(new BasicNameValuePair("title", title));
  118. params.add(new BasicNameValuePair("summary", data));
  119. HttpResponse response = post(url, params);
  120. if (response != null && response.getStatusCode() == 200) {
  121. JSONObject json = JSONObject.fromObject(response.getBody());
  122. if (!"200".equals(json.optString("status"))) {
  123. throw new Exception(json.optString("msg"));
  124. }
  125. } else {
  126. throw new Exception("接口调用错误!"+response.getBody());
  127. }
  128. return true;
  129. } catch (Exception e) {
  130. logger.error("push message error:", e);
  131. }
  132. return false;
  133. }
  134. private static HttpResponse post(String url, List<NameValuePair> params) {
  135. HttpResponse re = new HttpResponse();
  136. CloseableHttpResponse response = null;
  137. CloseableHttpClient httpclient = HttpClients.createDefault();
  138. //设置请求信息
  139. try {
  140. RequestConfig requestConfig = RequestConfig.custom().
  141. setAuthenticationEnabled(true).build();
  142. HttpPost request = new HttpPost(url );
  143. request.setEntity(new UrlEncodedFormEntity(params,"UTF-8"));
  144. request.setConfig(requestConfig);
  145. response = httpclient.execute(request);
  146. re.setStatusCode(response.getStatusLine().getStatusCode());
  147. re.setBody(EntityUtils.toString(response.getEntity(), "UTF-8"));
  148. } catch (Exception e) {
  149. re.setStatusCode(201);
  150. re.setBody(e.getMessage());
  151. } finally {
  152. close(httpclient, response);
  153. }
  154. return re;
  155. }
  156. private static void close(CloseableHttpClient httpClient, CloseableHttpResponse response) {
  157. try {
  158. if (httpClient != null) {
  159. httpClient.close();
  160. }
  161. } catch (Exception e) {
  162. System.out.print("关闭httpClient失败");
  163. }
  164. try {
  165. if (response != null) {
  166. response.close();
  167. }
  168. } catch (Exception e) {
  169. System.out.print("关闭response失败");
  170. }
  171. }
  172. }