这片文章中我们来集成下常用的消息队列(MQ)kafka,至于消息队列的作用,在此不再赘述,参考前面的文章。
在该篇文章中我没有采用配置文件的形式(application-dev.properties)配置,而是手动编写的kafkaProduce和kafkaConsumer的配置,这样更灵活。
注:基于demo-springboot
1.打开pom.xml加入以下内容:
org.springframework.kafka spring-kafka 2.1.4.RELEASE
以上为引入spring-kafka的最新依赖包
2.编写生产者的配置:kafkaProduceConfig
package com.example.demo.config;import com.google.common.collect.Maps;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import java.util.Map;/** * @author xiaofeng * @version V1.0 * @title: KafkaProducerConfig.java * @package: com.example.demo.config * @description: kafka生产者配置 * @date 2018/4/2 0002 下午 3:49 */@Configuration@EnableKafkapublic class KafkaProducerConfig { public MapproducerConfigs() { Map props = Maps.newHashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.22:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate (producerFactory()); }}
3.编写消费者的配置:kafkaConsumerConfig
package com.example.demo.config;import com.google.common.collect.Maps;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.boot.autoconfigure.kafka.KafkaProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.Map;/** * @author xiaofeng * @version V1.0 * @title: KafkaConfiguration.java * @package: com.example.demo.config * @description: kafka消费者配置 * @date 2018/4/2 0002 下午 3:42 */@Configuration@EnableKafkapublic class KafkaConsumerConfig { @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map consumerConfigs() { Map propsMap = Maps.newHashMap(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.22:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return propsMap; } @Bean public KafkaProperties.Listener listener() { return new KafkaProperties.Listener(); }}
以上为kafka生产者和消费者的相关配置,至于各个配置选项我并没有做详细的解释,相关童鞋可以自行查询。
4.编写生产者
package com.example.demo.kafka.sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;/** * @author xiaofeng * @version V1.0 * @title: TestKafkaSender.java * @package: com.example.demo.kafka.sender * @description: kafka生产者 * @date 2018/4/2 0002 下午 3:31 */@Componentpublic class TestKafkaSender { @Autowired private KafkaTemplate kafkaTemplate; @Value("${kafka.test.topic}") String testTopic; public void sendTest(String msg){ kafkaTemplate.send(testTopic, msg); }}
5.编写消费者
package com.example.demo.kafka.consumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * @author xiaofeng * @version V1.0 * @title: FollowHisOrderConsumer.java * @package: com.example.demo.kafka.consumer * @description: kafka消费者 * @date 2018/4/2 0002 下午 3:31 */@Componentpublic class TestKafkaConsumer { Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = { "${kafka.test.topic}"}) public void consumer(String message) { logger.info(" message = " + message); System.out.printf("message=" + message); }}
最后我们就可以开始测试了,编写单元测试类:
接下来自己去控制台查看消费者是否已经接收到了相关数据吧!(不知道你有没有收到,反正我是收到了^_^)