1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- package com.yihu.quota.kafka;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
- import org.springframework.stereotype.Component;
- import org.yaml.snakeyaml.Yaml;
- import java.io.FileInputStream;
- import java.net.URL;
- import java.util.HashMap;
- import java.util.Map;
- /**
- * @author janseny
- * @date 2018/9/14
- */
- @Component
- public class Producer {
- private static final Logger logger = LoggerFactory.getLogger(Producer.class);
- public static String sepTopic = "sep-hbase-data";
- public ProducerFactory<String, String> producerFactory() {
- String kafkaBrokerAddress = "";
- try {
- Yaml yaml = new Yaml();
- URL url = Producer.class.getClassLoader().getResource("application.yml");
- if (url != null) {
- Map map = (Map) yaml.load(new FileInputStream(url.getFile()));
- Map map2 = (Map) map.get("kafka");
- Map map3 = (Map) map2.get("broker");
- kafkaBrokerAddress = map3.get("address").toString();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress);
- props.put(ProducerConfig.RETRIES_CONFIG, 0);
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return new DefaultKafkaProducerFactory<>(props);
- }
- public boolean sendMessage(String topic, String message) {
- try {
- if (logger.isInfoEnabled()) {
- logger.info("send Message success.");
- }
- KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
- kafkaTemplate.send(topic, message);
- return true;
- } catch (Exception e) {
- if (logger.isErrorEnabled()) {
- logger.error("send Message fail." + "topic:" + topic + ",message:" + message + "error:" + e.getMessage(), e);
- }
- return false;
- }
- }
- }
|