博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot + kafka
阅读量:5760 次
发布时间:2019-06-18

本文共 6031 字,大约阅读时间需要 20 分钟。

这片文章中我们来集成下常用的消息队列(MQ)kafka,至于消息队列的作用,在此不再赘述,参考前面的文章。

在该篇文章中我没有采用配置文件的形式(application-dev.properties)配置,而是手动编写的kafkaProduce和kafkaConsumer的配置,这样更灵活。

注:基于demo-springboot

1.打开pom.xml加入以下内容:

org.springframework.kafka
spring-kafka
2.1.4.RELEASE

以上为引入spring-kafka的最新依赖包

2.编写生产者的配置:kafkaProduceConfig

package com.example.demo.config;import com.google.common.collect.Maps;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import java.util.Map;/** * @author xiaofeng * @version V1.0 * @title: KafkaProducerConfig.java * @package: com.example.demo.config * @description: kafka生产者配置 * @date 2018/4/2 0002 下午 3:49 */@Configuration@EnableKafkapublic class KafkaProducerConfig {    public Map
producerConfigs() { Map
props = Maps.newHashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.22:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory
producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate
kafkaTemplate() { return new KafkaTemplate
(producerFactory()); }}

3.编写消费者的配置:kafkaConsumerConfig

package com.example.demo.config;import com.google.common.collect.Maps;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.boot.autoconfigure.kafka.KafkaProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.Map;/**  * @author xiaofeng  * @version V1.0  * @title: KafkaConfiguration.java  * @package: com.example.demo.config  * @description: kafka消费者配置  * @date 2018/4/2 0002 下午 3:42  */@Configuration@EnableKafkapublic class KafkaConsumerConfig {    @Bean    public KafkaListenerContainerFactory
> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } public ConsumerFactory
consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map
consumerConfigs() { Map
propsMap = Maps.newHashMap(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.22:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return propsMap; } @Bean public KafkaProperties.Listener listener() { return new KafkaProperties.Listener(); }}

以上为kafka生产者和消费者的相关配置,至于各个配置选项我并没有做详细的解释,相关童鞋可以自行查询。

4.编写生产者

package com.example.demo.kafka.sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;/**  * @author xiaofeng  * @version V1.0  * @title: TestKafkaSender.java  * @package: com.example.demo.kafka.sender  * @description: kafka生产者  * @date 2018/4/2 0002 下午 3:31  */@Componentpublic class TestKafkaSender {    @Autowired    private KafkaTemplate kafkaTemplate;    @Value("${kafka.test.topic}")    String testTopic;    public void sendTest(String msg){        kafkaTemplate.send(testTopic, msg);    }}

5.编写消费者

package com.example.demo.kafka.consumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * @author xiaofeng * @version V1.0 * @title: FollowHisOrderConsumer.java * @package: com.example.demo.kafka.consumer * @description: kafka消费者 * @date 2018/4/2 0002 下午 3:31 */@Componentpublic class TestKafkaConsumer {    Logger logger = LoggerFactory.getLogger(getClass());    @KafkaListener(topics = {
"${kafka.test.topic}"}) public void consumer(String message) { logger.info(" message = " + message); System.out.printf("message=" + message); }}

最后我们就可以开始测试了,编写单元测试类:

接下来自己去控制台查看消费者是否已经接收到了相关数据吧!(不知道你有没有收到,反正我是收到了^_^)

转载于:https://www.cnblogs.com/babac/p/9028480.html

你可能感兴趣的文章
javascript静态类型检测工具—Flow
查看>>
MachineLearning-Sklearn——环境搭建
查看>>
node学习之路(二)—— Node.js 连接 MongoDB
查看>>
Goroutine是如何工作的?
查看>>
《深入理解java虚拟机》学习笔记系列——垃圾收集器&内存分配策略
查看>>
TriggerMesh开源用于多云环境的Knative Event Sources
查看>>
GitLab联合DigitalOcean为开源社区提供GitLab CI免费托管
查看>>
通过XAML Islands使Windows桌面应用程序现代化
查看>>
区块链现状:从谨慎和批判性思维看待它(第二部分)
查看>>
苹果公司透露Siri新发音引擎的内部原理
查看>>
GCM 3.0采用类似方式向Android、iOS和Chrome发送消息
查看>>
如何成为一家敏捷银行
查看>>
Oracle在JavaOne上宣布Java EE 8将会延期至2017年底
查看>>
Javascript 深入浅出原型
查看>>
简单之极,搭建属于自己的Data Mining环境(Spark版本)
查看>>
Ruby 2.5.0概览
查看>>
如何通过解决精益问题提高敏捷团队生产力
查看>>
Comment2Wechat —— Typecho 插件
查看>>
Apache下.htaccess文件配置及功能介绍
查看>>
Magento XML cheatsheet
查看>>