package camel.demo.jdbcdemo.route; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.SimpleRegistry; import org.apache.camel.model.ModelCamelContext; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.commons.dbcp.BasicDataSource; import org.springframework.stereotype.Component; @Component public class ApiRouteBulider extends RouteBuilder { final String url = "jdbc:mysql://172.19.103.57:8066/hos2?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&autoReconnect=true"; public void dataSource1(){ ModelCamelContext context = this.getContext(); BasicDataSource basicDataSource = new BasicDataSource(); basicDataSource.setDriverClassName("com.mysql.jdbc.Driver"); basicDataSource.setUsername("hos"); basicDataSource.setPassword("123456"); basicDataSource.setUrl(url); SimpleRegistry simpleregistry = new SimpleRegistry(); simpleregistry.put("myDataSource", basicDataSource); context = new DefaultCamelContext(simpleregistry); } @Override public void configure() throws Exception { //9.31 // from("quartz://myGroup/testTimer1?cron=0 56/2 14 23 4 ? 2018").routeId("testTimer1") // .to("bean:jdbcProcessor?method=logOut"); //事件 查询主线 from("direct:patientList").routeId("camel/demo/jdbc") // .setBody(constant("select * from HDSC02_09 ")) .to("bean:jdbcProcessor?method=setSqlBody") .to("jdbc:dataSource?outputType=SelectList") .setHeader("dataSetCode",constant("HDSC02_09")) // .setHeader("HDSC02_09",simple("${body}")) // .split(body()) // .wireTap("direct:relationTable1")// 结果消息传递到下一个表 // .wireTap("direct:relationTable2")// 结果消息传递到 // .wireTap("direct:relationTable3")// 结果消息传递到 // .aggregate(header("dataSetCode")) // .end() // .process(new JdbcProcessor()) // .wireTap("bean:jdbcProcessor?method=logOut") ; // 关联查询表 from("direct:relationTable1").routeId("relationTable1") .setHeader("dataSetCode",constant("HDSC02_17")) .setHeader("PATIENT_ID",simple("${body['PATIENT_ID']}")) .setHeader("HDSD00_01_579",simple("${body['HDSD00_01_579']}")) .setHeader("HDSD00_01_185",simple("${body['HDSD00_01_185']}")) // .setBody(constant("select :?PATIENT_ID as PATIENT_ID,:?HDSD00_01_579 as HDSD00_01_579,:?HDSD00_01_185 as HDSD00_01_185,a.* from HDSC02_17 a WHERE a.PATIENT_ID=:?PATIENT_ID,a.HDSD00_01_579=:?HDSD00_01_579")) .setBody(constant("select a.* from HDSC02_17 a WHERE a.PATIENT_ID=:?PATIENT_ID and a.HDSD00_01_579=:?HDSD00_01_579")) .wireTap("bean:jdbcProcessor?method=logOut") .to("jdbc:dataSource?useHeadersAsParameters=true") // .setHeader("HDSC02_17",simple("${body}")) // .split(body()) // .to("bean:jdbcProcessor?method=aggreeOpera") // .end() ; // 下一张表2 查询 HDSA00_1 from("direct:relationTable2").routeId("relationTable2") .setHeader("dataSetCode",constant("HDSA00_1")) .setHeader("PATIENT_ID",simple("${body['PATIENT_ID']}")) .wireTap("bean:jdbcProcessor?method=logOut") .setBody(constant("select a.* from HDSA00_1 a WHERE a.PATIENT_ID=:?PATIENT_ID")) .to("jdbc:dataSource?useHeadersAsParameters=true") // .setHeader("HDSA00_1",simple("${body}")) // .split(body()) ; // 下一张表3 查询 HDSC02_14 from("direct:relationTable3").routeId("relationTable3") .setHeader("dataSetCode",constant("HDSC02_14")) .setHeader("PATIENT_ID",simple("${body['PATIENT_ID']}")) .setHeader("HDSD00_01_579",simple("${body['HDSD00_01_579']}")) .setHeader("HDSD00_01_185",simple("${body['HDSD00_01_185']}")) .wireTap("bean:jdbcProcessor?method=logOut") .setBody(constant("select a.* from HDSC02_14 a WHERE a.PATIENT_ID=:?PATIENT_ID nad a.HDSD00_01_579=:?HDSD00_01_579")) .to("jdbc:dataSource?useHeadersAsParameters=true") // .setHeader("HDSC02_14",simple("${body}")) // .split(body()) .end() ; from("jetty:http://0.0.0.0:9094/demo/jdbc").to("direct:patientList") .split(body()) .multicast(new MyAggregationStrategy()).to("direct:relationTable1,direct:relationTable2,direct:relationTable3").parallelProcessing() .end() .to("direct:upload") .end(); from("direct:t0").setBody(constant("p1,p2,p3,p4")).log("t0:" + body().toString()); from("direct:ta").log("ta:" + body().toString()); from("direct:tb").log("tb:" + body().toString()); from("direct:tc").log("tc:" + body().toString()); from("direct:upload").log("upload:" + body().toString()); } public class MyAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } else { String body1 = oldExchange.getIn().getBody(String.class); String body2 = newExchange.getIn().getBody(String.class); String merged = (body1 == null) ? body2 : body1 + "," + body2; oldExchange.getIn().setBody(merged); return oldExchange; } } } }