需求:创建一个独立消费者,消费主题中数据:
[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 3 --replication-factor 3 --topic hh[root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
Topic:hh PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: hh Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 2,1,0
Topic: hh Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 3,0,2
Topic: hh Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2

注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建消费者组,组名任意起名都可以properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");// 创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);// 订阅主题ArrayList topics = new ArrayList<>();topics.add("hh");consumer.subscribe(topics);// 消费数据while (true){ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
}
Springboot 自定义日志配置关闭Kafka消费者debug日志打印:在resource目录下添加文件 logback.xml 即可。
%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n ${LOG_HOME}/%d{yyyy-MM-dd}.%i.log 50MB 30 %d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n
测试生产者发送消息:

需求:创建一个独立消费者,消费主题 0 号分区的数据。

① kafka 消费者消费主题0号分区的数据:
public class CustomConsumerPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建消费者组,组名任意起名都可以properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");// 创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);// 订阅主题对应的分区ArrayList topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("hh",0));consumer.assign(topicPartitions);// 消费数据while (true){ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
}
② kafka 生产者向主题的0号分区发送数据:
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// kafka生产者属性配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// kafka生产者KafkaProducer kafkaProducer = new KafkaProducer(properties);for(int i=0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("hh" ,0,"","hello,kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {if(exception==null){// 消息发送成功System.out.println("主题"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());}else{// 消息发送失败exception.printStackTrace();}}});Thread.sleep(2);}// 关闭资源kafkaProducer.close();}
}
③ 测试:先启动消费者程序,再启动生产者程序

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

① 创建3个消费者:复制2份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的3个消费者。
public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建消费者组,组名任意起名都可以properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");// 创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);// 订阅主题ArrayList topics = new ArrayList<>();topics.add("hh");consumer.subscribe(topics);// 消费数据while (true){ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());}}}
}
② 生产者发送消息:
public class CustomProducerCallbackPartitions {public static void main(String[] args) throws InterruptedException {// kafka生产者属性配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 添加自定义分区器// properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hh.producer.MyPartitioner");// kafka生产者KafkaProducer kafkaProducer = new KafkaProducer(properties);for(int i=0;i<50;i++){kafkaProducer.send(new ProducerRecord<>("hh" ,"hello,kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception exception) {if(exception==null){// 消息发送成功System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());}else{// 消息发送失败exception.printStackTrace();}}});Thread.sleep(2);}// 关闭资源kafkaProducer.close();}
}
③ 测试:先启动3个消费者程序,再启动生产者程序

可以看到发送的50条消息分别被消费者组中的不同消费者消费,他们消费的是不同分区的数据。