ApiProcessor.java 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package apisync.processor;
  2. import org.apache.camel.Exchange;
  3. import org.apache.camel.ExchangePattern;
  4. import org.apache.camel.Message;
  5. import org.apache.camel.Processor;
  6. import org.apache.camel.http.common.HttpMessage;
  7. import java.io.ByteArrayOutputStream;
  8. import java.io.IOException;
  9. import java.io.InputStream;
  10. /**
  11. * @author Airhead
  12. * @since 2016-11-13
  13. */
  14. public class ApiProcessor implements Processor {
  15. public void process(Exchange exchange) throws Exception {
  16. // 因为很明确消息格式是http的,所以才使用这个类
  17. // 否则还是建议使用org.apache.camel.Message这个抽象接口
  18. HttpMessage message = (HttpMessage) exchange.getIn();
  19. InputStream bodyStream = (InputStream) message.getBody();
  20. String inputContext = this.analysisMessage(bodyStream);
  21. bodyStream.close();
  22. // 存入到exchange的out区域
  23. if (exchange.getPattern() == ExchangePattern.InOut) {
  24. Message outMessage = exchange.getOut();
  25. outMessage.setBody("hello.123," + inputContext);
  26. }
  27. }
  28. /**
  29. * 从stream中分析字符串内容
  30. *
  31. * @param bodyStream
  32. * @return
  33. */
  34. private String analysisMessage(InputStream bodyStream) throws IOException {
  35. if (bodyStream == null) {
  36. return "";
  37. }
  38. ByteArrayOutputStream outStream = new ByteArrayOutputStream();
  39. byte[] contextBytes = new byte[4096];
  40. int realLen;
  41. while ((realLen = bodyStream.read(contextBytes, 0, 4096)) != -1) {
  42. outStream.write(contextBytes, 0, realLen);
  43. }
  44. // 返回从Stream中读取的字串
  45. try {
  46. return new String(outStream.toByteArray(), "UTF-8");
  47. } finally {
  48. outStream.close();
  49. }
  50. }
  51. }