To consume multiple Kafka topics in Python, you can use thekafka-python
library, which is a popular Kafka client library. Below is a detailed code example showing how to create a Kafka consumer and consume multiple Kafka topics at the same time.
1. Environmental preparation
(1)Installing Kafka and Zookeeper: Make sure Kafka and Zookeeper are installed and running.
(2)Installing the kafka-python library: Installation via pipkafka-python
Coop.
bash copy code
pip install kafka-python
2. Sample Code
Here is a complete Python script showing how to create a Kafka consumer and consume multiple topics.
from kafka import KafkaConsumer
import json
import logging
# Configuration log
(
level=,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = (__name__)
# Kafkaconfigure
bootstrap_servers = 'localhost:9092' # Replace yourKafkaserver address
group_id = 'multi-topic-consumer-group'
topics = ['topic1', 'topic2', 'topic3'] # Replace it with the one you want to consumetopic
# consumersconfigure
consumer_config = {
'bootstrap_servers': bootstrap_servers,
'group_id': group_id,
'auto_offset_reset': 'earliest', # From the earliestoffsetStarting to consume
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'value_deserializer': lambda x: (('utf-8')) # Suppose the message isJSONspecification
}
# establishKafkaconsumers
consumer = KafkaConsumer(**consumer_config)
# Subscribe to multipletopic
(topics)
try:
# infinite loop,Continuous Consumption News
while True:
for message in consumer:
topic =
partition =
offset =
key =
value =
# Printing consumed messages
(f"Consumed message from topic: {topic}, partition: {partition}, offset: {offset}, key: {key}, value: {value}")
# You can add logic to handle messages here
# process_message(topic, partition, offset, key, value)
except KeyboardInterrupt:
# capturesCtrl+C,优雅关闭consumers
("Caught KeyboardInterrupt, closing consumer.")
()
except Exception as e:
# captures其他异常,记录日志并关闭consumers
(f"An error occurred: {e}", exc_info=True)
()
3. Code Interpretation
(1)Log Configuration: Using Python'slogging
Module configuration log for easy debugging and logging of information during consumption.
(2)Kafka Configuration: Set the address of the Kafka server, the consumer group ID, and the list of topics to consume.
(3)Consumer Configuration: Configure consumer parameters, including the automatic reset offset, the time interval for automatic submission of the offset, and the message deserialization method (assuming here that the message is in JSON format).
(4)Creating Consumers: Create a Kafka consumer instance using the configuration.
(5)Subscribe to topic: Bymethod subscribes to multiple topics.
(6)consumer goods: consumes the message in an infinite loop and prints the details of the message (topic, partition, offset, key and value).
(7)Exception handling: CaptureKeyboardInterrupt
(Ctrl+C) to gracefully close the consumer and catch other exceptions and log them.
4. Run the script
Make sure that Kafka and Zookeeper are running and that you have created the appropriate topic in Kafka (topic1
、topic2
、topic3
). Then run the script:
bash copy code
python kafka_multi_topic_consumer.py
This script will start consuming the specified topic and print out the details of each message on the console. You can modify the processing logic in the script as needed, such as storing the messages in a database or sending them to other services.
5. Reference value and practical significance
This sample code shows how to use the Pythonkafka-python
The library consumes multiple Kafka topics and is suitable for scenarios where data streams from different topics need to be processed. For example, in a real-time data processing system, different topics may represent different types of data streams, and by consuming multiple topics, data integration and processing can be realized. In addition, the example demonstrates basic exception handling and logging, which helps debugging and monitoring in a production environment.