Kafka - 13 Java 客户端实现消费者消费消息
创始人
2024-03-16 18:48:55
0

文章目录

    • 1. 独立消费者案例(订阅主题)
    • 2. 独立消费者案例(订阅分区)
    • 3. 消费者组案例

1. 独立消费者案例(订阅主题)

需求:创建一个独立消费者,消费主题中数据:

[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 3 --replication-factor 3 --topic hh[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh    PartitionCount:3    ReplicationFactor:3     Configs:segment.bytes=1073741824
Topic: hh       Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,1,0
Topic: hh       Partition: 1    Leader: 2       Replicas: 2,0,3 Isr: 3,0,2
Topic: hh       Partition: 2    Leader: 3       Replicas: 3,2,1 Isr: 3,1,2

在这里插入图片描述

注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id

public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建消费者组,组名任意起名都可以properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");// 创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);// 订阅主题ArrayList topics = new ArrayList<>();topics.add("hh");consumer.subscribe(topics);// 消费数据while (true){ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
}

Springboot 自定义日志配置关闭Kafka消费者debug日志打印:在resource目录下添加文件 logback.xml 即可。


%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n${LOG_HOME}/%d{yyyy-MM-dd}.%i.log50MB30%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n

测试生产者发送消息:

在这里插入图片描述

2. 独立消费者案例(订阅分区)

需求:创建一个独立消费者,消费主题 0 号分区的数据。

在这里插入图片描述

① kafka 消费者消费主题0号分区的数据:

public class CustomConsumerPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建消费者组,组名任意起名都可以properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");// 创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);// 订阅主题对应的分区ArrayList topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("hh",0));consumer.assign(topicPartitions);// 消费数据while (true){ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
}

② kafka 生产者向主题的0号分区发送数据:

public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// kafka生产者属性配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// kafka生产者KafkaProducer kafkaProducer = new KafkaProducer(properties);for(int i=0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("hh" ,0,"","hello,kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {if(exception==null){// 消息发送成功System.out.println("主题"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());}else{// 消息发送失败exception.printStackTrace();}}});Thread.sleep(2);}// 关闭资源kafkaProducer.close();}
}

③ 测试:先启动消费者程序,再启动生产者程序

在这里插入图片描述

3. 消费者组案例

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

在这里插入图片描述

① 创建3个消费者:复制2份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的3个消费者。

public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建消费者组,组名任意起名都可以properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");// 创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);// 订阅主题ArrayList topics = new ArrayList<>();topics.add("hh");consumer.subscribe(topics);// 消费数据while (true){ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
}

② 生产者发送消息:

public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// kafka生产者属性配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加自定义分区器// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hh.producer.MyPartitioner");// kafka生产者KafkaProducer kafkaProducer = new KafkaProducer(properties);for(int i=0;i<50;i++){kafkaProducer.send(new ProducerRecord<>("hh" ,"hello,kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {if(exception==null){// 消息发送成功System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());}else{// 消息发送失败exception.printStackTrace();}}});Thread.sleep(2);}// 关闭资源kafkaProducer.close();}
}

③ 测试:先启动3个消费者程序,再启动生产者程序

在这里插入图片描述

可以看到发送的50条消息分别被消费者组中的不同消费者消费,他们消费的是不同分区的数据。

相关内容

热门资讯

低空经济持续获政策支持 业绩有... 人民财讯12月26日电,低空经济作为国家战略性新兴产业,持续受到政策支持。 作为万亿级产业,低空经济...
【深圳特区报】从“制度设计图”... 近日,深圳市财政局召开“数据资产全过程管理试点工作培训暨经验交流会”(以下简称“培训交流会”),围绕...
原创 重... 在情感的漩涡中,我们常常看到悲剧的发生。近日,一则令人心痛的消息震动了整个网络——重庆一男子因感情纠...
极氪回应明年车辆质保权益等问题... 【CNMO科技消息】近日,极氪汽车发布《极氪零距离 | 你问我答》公告,就用户关注的车辆质保、免费充...
关于海南育儿补贴制度实施热点问... 户籍刚迁入海南的婴幼儿是否能享受育儿补贴;如何快速了解补贴申领流程……12月25日,海南省新闻办公室...
宋朝的中介制度 走进《清明上河图》中汴河两岸喧嚷的市集,除了林立的店铺与往来的舟车,还有一种身影穿梭其间——他们并非...
《西安市地下水条例》《西安历史... 央广网西安12月26日消息(记者侯凯奇)12月25日,西安市人大常委会召开新闻发布会,正式公布《西安...
三部门发文完善幼儿园收费政策 ... 近年来,人民群众对适龄儿童“上得起”“上好园”的诉求越来越强烈,幼儿园收费政策需要与时俱进。近日,为...
下一阶段货币政策如何发力?央行... 中国网财经12月26日讯 近日中国人民银行货币政策委员会召开2025年第四季度例会,分析国内外经济金...
跨省盗杀家犬 涉嫌犯罪终落网 山西晚报·山河+讯(记者 辛戈)套牌的汽车、70余支已经装填或待用的毒针、50余颗用剧毒物质自制的药...