ApiRouteBulider.java 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package camel.demo.jdbcdemo.route;
  2. import org.apache.camel.Exchange;
  3. import org.apache.camel.builder.RouteBuilder;
  4. import org.apache.camel.impl.DefaultCamelContext;
  5. import org.apache.camel.impl.SimpleRegistry;
  6. import org.apache.camel.model.ModelCamelContext;
  7. import org.apache.camel.processor.aggregate.AggregationStrategy;
  8. import org.apache.commons.dbcp.BasicDataSource;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class ApiRouteBulider extends RouteBuilder {
  12. final String url = "jdbc:mysql://172.19.103.57:8066/hos2?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&autoReconnect=true";
  13. public void dataSource1(){
  14. ModelCamelContext context = this.getContext();
  15. BasicDataSource basicDataSource = new BasicDataSource();
  16. basicDataSource.setDriverClassName("com.mysql.jdbc.Driver");
  17. basicDataSource.setUsername("hos");
  18. basicDataSource.setPassword("123456");
  19. basicDataSource.setUrl(url);
  20. SimpleRegistry simpleregistry = new SimpleRegistry();
  21. simpleregistry.put("myDataSource", basicDataSource);
  22. context = new DefaultCamelContext(simpleregistry);
  23. }
  24. @Override
  25. public void configure() throws Exception {
  26. //9.31
  27. // from("quartz://myGroup/testTimer1?cron=0 56/2 14 23 4 ? 2018").routeId("testTimer1")
  28. // .to("bean:jdbcProcessor?method=logOut");
  29. //事件 查询主线
  30. from("direct:patientList").routeId("camel/demo/jdbc")
  31. // .setBody(constant("select * from HDSC02_09 "))
  32. .to("bean:jdbcProcessor?method=setSqlBody")
  33. .to("jdbc:dataSource?outputType=SelectList")
  34. .setHeader("dataSetCode",constant("HDSC02_09"))
  35. // .setHeader("HDSC02_09",simple("${body}"))
  36. // .split(body())
  37. // .wireTap("direct:relationTable1")// 结果消息传递到下一个表
  38. // .wireTap("direct:relationTable2")// 结果消息传递到
  39. // .wireTap("direct:relationTable3")// 结果消息传递到
  40. // .aggregate(header("dataSetCode"))
  41. // .end()
  42. // .process(new JdbcProcessor())
  43. // .wireTap("bean:jdbcProcessor?method=logOut")
  44. ;
  45. // 关联查询表
  46. from("direct:relationTable1").routeId("relationTable1")
  47. .setHeader("dataSetCode",constant("HDSC02_17"))
  48. .setHeader("PATIENT_ID",simple("${body['PATIENT_ID']}"))
  49. .setHeader("HDSD00_01_579",simple("${body['HDSD00_01_579']}"))
  50. .setHeader("HDSD00_01_185",simple("${body['HDSD00_01_185']}"))
  51. // .setBody(constant("select :?PATIENT_ID as PATIENT_ID,:?HDSD00_01_579 as HDSD00_01_579,:?HDSD00_01_185 as HDSD00_01_185,a.* from HDSC02_17 a WHERE a.PATIENT_ID=:?PATIENT_ID,a.HDSD00_01_579=:?HDSD00_01_579"))
  52. .setBody(constant("select a.* from HDSC02_17 a WHERE a.PATIENT_ID=:?PATIENT_ID and a.HDSD00_01_579=:?HDSD00_01_579"))
  53. .wireTap("bean:jdbcProcessor?method=logOut")
  54. .to("jdbc:dataSource?useHeadersAsParameters=true")
  55. // .setHeader("HDSC02_17",simple("${body}"))
  56. // .split(body())
  57. // .to("bean:jdbcProcessor?method=aggreeOpera")
  58. // .end()
  59. ;
  60. // 下一张表2 查询 HDSA00_1
  61. from("direct:relationTable2").routeId("relationTable2")
  62. .setHeader("dataSetCode",constant("HDSA00_1"))
  63. .setHeader("PATIENT_ID",simple("${body['PATIENT_ID']}"))
  64. .wireTap("bean:jdbcProcessor?method=logOut")
  65. .setBody(constant("select a.* from HDSA00_1 a WHERE a.PATIENT_ID=:?PATIENT_ID"))
  66. .to("jdbc:dataSource?useHeadersAsParameters=true")
  67. // .setHeader("HDSA00_1",simple("${body}"))
  68. // .split(body())
  69. ;
  70. // 下一张表3 查询 HDSC02_14
  71. from("direct:relationTable3").routeId("relationTable3")
  72. .setHeader("dataSetCode",constant("HDSC02_14"))
  73. .setHeader("PATIENT_ID",simple("${body['PATIENT_ID']}"))
  74. .setHeader("HDSD00_01_579",simple("${body['HDSD00_01_579']}"))
  75. .setHeader("HDSD00_01_185",simple("${body['HDSD00_01_185']}"))
  76. .wireTap("bean:jdbcProcessor?method=logOut")
  77. .setBody(constant("select a.* from HDSC02_14 a WHERE a.PATIENT_ID=:?PATIENT_ID nad a.HDSD00_01_579=:?HDSD00_01_579"))
  78. .to("jdbc:dataSource?useHeadersAsParameters=true")
  79. // .setHeader("HDSC02_14",simple("${body}"))
  80. // .split(body())
  81. .end()
  82. ;
  83. from("jetty:http://0.0.0.0:9094/demo/jdbc").to("direct:patientList")
  84. .split(body())
  85. .multicast(new MyAggregationStrategy()).to("direct:relationTable1,direct:relationTable2,direct:relationTable3").parallelProcessing()
  86. .end()
  87. .to("direct:upload")
  88. .end();
  89. from("direct:t0").setBody(constant("p1,p2,p3,p4")).log("t0:" + body().toString());
  90. from("direct:ta").log("ta:" + body().toString());
  91. from("direct:tb").log("tb:" + body().toString());
  92. from("direct:tc").log("tc:" + body().toString());
  93. from("direct:upload").log("upload:" + body().toString());
  94. }
  95. public class MyAggregationStrategy implements AggregationStrategy {
  96. @Override
  97. public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
  98. if (oldExchange == null) {
  99. return newExchange;
  100. } else {
  101. String body1 = oldExchange.getIn().getBody(String.class);
  102. String body2 = newExchange.getIn().getBody(String.class);
  103. String merged = (body1 == null) ? body2 : body1 + "," + body2;
  104. oldExchange.getIn().setBody(merged);
  105. return oldExchange;
  106. }
  107. }
  108. }
  109. }