KafkaProducerConfig.java 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package com.yihu.ehr.analysis.config.kafka;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.kafka.annotation.EnableKafka;
  8. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.kafka.core.ProducerFactory;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. /**
  14. * Created by Administrator on 2017/2/6.
  15. */
  16. @Configuration
  17. @EnableKafka
  18. public class KafkaProducerConfig {
  19. @Value("${kafka.broker.address}")
  20. private String brokerAddress;
  21. @Bean
  22. public ProducerFactory<String, String> producerFactory() {
  23. return new DefaultKafkaProducerFactory<>(producerConfigs());
  24. }
  25. @Bean
  26. public Map<String, Object> producerConfigs() {
  27. Map<String, Object> props = new HashMap<>();
  28. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
  29. props.put(ProducerConfig.RETRIES_CONFIG, 0);
  30. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  31. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  32. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  33. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  34. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  35. return props;
  36. }
  37. @Bean
  38. public KafkaTemplate<String, String> kafkaTemplate() {
  39. return new KafkaTemplate<String, String>(producerFactory());
  40. }
  41. }