Producer.java 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package com.yihu.quota.kafka;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.kafka.core.ProducerFactory;
  9. import org.springframework.stereotype.Component;
  10. import org.yaml.snakeyaml.Yaml;
  11. import java.io.FileInputStream;
  12. import java.net.URL;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. /**
  16. * @author janseny
  17. * @date 2018/9/14
  18. */
  19. @Component
  20. public class Producer {
  21. private static final Logger logger = LoggerFactory.getLogger(Producer.class);
  22. public static String sepTopic = "sep-hbase-data";
  23. public ProducerFactory<String, String> producerFactory() {
  24. String kafkaBrokerAddress = "";
  25. try {
  26. Yaml yaml = new Yaml();
  27. URL url = Producer.class.getClassLoader().getResource("application.yml");
  28. if (url != null) {
  29. Map map = (Map) yaml.load(new FileInputStream(url.getFile()));
  30. Map map2 = (Map) map.get("kafka");
  31. Map map3 = (Map) map2.get("broker");
  32. kafkaBrokerAddress = map3.get("address").toString();
  33. }
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. }
  37. Map<String, Object> props = new HashMap<>();
  38. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
  39. props.put(ProducerConfig.RETRIES_CONFIG, 0);
  40. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  41. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  42. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  43. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  44. return new DefaultKafkaProducerFactory<>(props);
  45. }
  46. public boolean sendMessage(String topic, String message) {
  47. try {
  48. if (logger.isInfoEnabled()) {
  49. logger.info("send Message success.");
  50. }
  51. KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
  52. kafkaTemplate.send(topic, message);
  53. return true;
  54. } catch (Exception e) {
  55. if (logger.isErrorEnabled()) {
  56. logger.error("send Message fail." + "topic:" + topic + ",message:" + message + "error:" + e.getMessage(), e);
  57. }
  58. return false;
  59. }
  60. }
  61. }