CamelStartBoot.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package com.yihu.hos.broker.services.camel;
  2. import com.yihu.hos.broker.common.log.TracerFormatter;
  3. import com.yihu.hos.broker.services.BrokerServerService;
  4. import com.yihu.hos.broker.util.lang.DynamicClassLoader;
  5. import com.yihu.hos.core.log.Logger;
  6. import com.yihu.hos.core.log.LoggerFactory;
  7. import org.apache.camel.CamelContext;
  8. import org.apache.camel.Exchange;
  9. import org.apache.camel.builder.RouteBuilder;
  10. import org.apache.camel.processor.interceptor.DefaultTraceFormatter;
  11. import org.apache.camel.processor.interceptor.Tracer;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
  14. import org.springframework.stereotype.Component;
  15. import java.util.concurrent.SynchronousQueue;
  16. /**
  17. * @author Airhead
  18. * @since 2016/12/9.
  19. */
  20. @Component
  21. public class CamelStartBoot {
  22. private static Logger logger = LoggerFactory.getLogger(CamelStartBoot.class);
  23. private BrokerServerService brokerServerService;
  24. private CamelContext context = SystemCamelContext.getContext();
  25. @Autowired
  26. private AutowireCapableBeanFactory capableBeanFactory;
  27. @Autowired
  28. public void setBrokerServerService(BrokerServerService brokerServerService) {
  29. this.brokerServerService = brokerServerService;
  30. }
  31. public void start() {
  32. logger.info("Apache Camel Context 启动...");
  33. try {
  34. context.setStreamCaching(true); //确保outBody可以消费多次。参考链接:http://camel.apache.org/why-is-my-message-body-empty.html
  35. context.setUseMDCLogging(true);
  36. Tracer tracer = new Tracer();
  37. tracer.setTraceOutExchanges(true);
  38. DefaultTraceFormatter formatter = new TracerFormatter();
  39. formatter.setShowHeaders(true);
  40. formatter.setShowBody(true);
  41. formatter.setShowBodyType(true);
  42. formatter.setShowOutHeaders(true);
  43. formatter.setShowOutBody(true);
  44. formatter.setShowOutBodyType(true);
  45. tracer.setFormatter(formatter);
  46. context.addInterceptStrategy(tracer);
  47. context.setTracing(true);
  48. context.getGlobalOptions().put(Exchange.LOG_DEBUG_BODY_STREAMS, "true");
  49. context.start();
  50. logger.info("Apache Camel Context 启动完成...");
  51. brokerServerService.login();
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. logger.error("Apache Camel Context 启动失败。");
  55. }
  56. }
  57. /**
  58. * serveProcessor 创建Processor实例
  59. */
  60. public void serveProcessor() {
  61. try {
  62. SynchronousQueue<String> processorQueue = SystemCamelContext.getProcessorQueue();
  63. String className;
  64. while ((className = processorQueue.take()) != null) {
  65. try {
  66. DynamicClassLoader dynamicClassLoader = new DynamicClassLoader(SystemCamelContext.getResource(this).getPath());
  67. Class<?> processorClass = dynamicClassLoader.loadClass(className);
  68. if (processorClass != null) {
  69. Object o = processorClass.newInstance();
  70. capableBeanFactory.autowireBean(o);
  71. }
  72. } catch (Exception e) {
  73. e.printStackTrace();
  74. logger.error("serveProcessor----加载数据Class异常11。。");
  75. }
  76. logger.info(className);
  77. }
  78. } catch (Exception e) {
  79. e.printStackTrace();
  80. logger.error("serveProcessor----加载数据Class异常11。。");
  81. }
  82. }
  83. /**
  84. * serveRouter 创建Router实例
  85. */
  86. public void serveRouter() {
  87. try {
  88. SynchronousQueue<String> routerQueue = SystemCamelContext.getRouterQueue();
  89. String className;
  90. while ((className = routerQueue.take()) != null) {
  91. try {
  92. DynamicClassLoader dynamicClassLoader = new DynamicClassLoader(SystemCamelContext.getResource(this).getPath());
  93. Class<RouteBuilder> routeBuilderClass = (Class<RouteBuilder>) dynamicClassLoader.loadClass(className);
  94. if (routeBuilderClass != null) {
  95. RouteBuilder routeBuilder = routeBuilderClass.newInstance();
  96. capableBeanFactory.autowireBean(routeBuilder);
  97. context.addRoutes(routeBuilder);
  98. }
  99. } catch (Exception e) {
  100. e.printStackTrace();
  101. logger.error("serveRouter----加载数据Class异常11。");
  102. }
  103. }
  104. } catch (Exception e) {
  105. e.printStackTrace();
  106. logger.error("serveRouter----加载数据Class异常22。");
  107. }
  108. }
  109. public void shutdown() {
  110. brokerServerService.logout();
  111. }
  112. }