Location>code7788 >text

There are pullable messages in the Kafka Topic, so why can't they be polled?

Popularity:632 ℃/2024-08-28 08:40:32

Happy Moment

A girl from elementary school messaged me today
Her: Are you graduating now?
Me: Uh, just graduated this year
She sent me a picture of a big orange cat in her arms
Her: My squint has grown so big, does it look good?
Me: Move the cat out of the way, it's in the way and I can't see it
She: Are you a sb? Get out!
I explained, "You're talking about cats.
But as soon as the message was sent, a red exclamation mark appeared with the message: the message was sent but rejected by the other party

你也没说猫叫眯眯呀

kafka build

For simplicity reasons, based ondocker Build akafka node; for some reasons, the domestic Docker Hub image gas pedal are not available, the current more reliable practice is to build a personal image repository, can be referred to:Docker can not pull the image solution, I've tried it and it works, but would like to add a few points

  1. Only the last mirror copy needs to be modified, nothing else needs to be changed

    sync-image-example_改动点

    Supports configuration of multiple copies of an image at once

  2. mirror copy

    Format of the docker image copy command

    skopeo copy docker:///namespace/mirror name:TAG docker://aliyun mirror address/namespace/mirror name:TAG

    Let's take kafka as an example, and go to Docker Hub and search for it, and boy, did we find tens of thousands of

    上万个kafka镜像

    Let's refine the search terms a bit and searchwurstmeister/kafka

    wurstmeister_kafka

    Tap in, and it's at the Docker Hub address:

    /r/wurstmeister/kafka

    Then its docker address is

    docker:///wurstmeister/kafka

    The other mirrors are found in a similar way, so the final copy command is similar to the following:

    skopeo copy docker:///wurstmeister/kafka:latest docker:///qingshilu/wurstmeister_kafka:latest

    If all goes well, then you can see our copied image in our AliCloud personal image repository

    阿里云个人镜像
  3. How to pull

    Tap on the mirror name in your personal repository and you will see theOperation Guide

    如何pull

    Let's just focus on the first two steps to pull down the image

    镜像过滤_wurstmeister

Once the image is acquired, you can build thekafka up; because of the dependencezookeeperLet's start it up.

docker run -d --name zookeeper-test -p 2181:2181 \
--env ZOO_MY_ID=1 \
-v zookeeper_vol:/data \
-v zookeeper_vol:/datalog \
-v zookeeper_vol:/logs \
/qingshilu/wurstmeister_zookeeper

then startkafka

docker run -d --name kafka-test -p 9092:9092 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.2.118:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.2.118 \
--env KAFKA_ADVERTISED_PORT=9092  \
--env KAFKA_LOG_DIRS=/kafka/logs \
-v kafka_vol:/kafka  \
/qingshilu/wurstmeister_kafka

If nothing else, they're all up and running.

kafka启动成功

If there's an accident, don't panic, use thedocker log Go check the logs and find the corresponding solution

# 1. First find the id of the container that failed to start up
docker ps -a
# 2. Use docker log to view the container startup logs
docker log container id

If you need to turn on thekafka (used form a nominal expression)SASL Certified and available for reference:Docker-Compose builds Kafka with SASL user password authentication rig up

Kafka Tool

Details can be viewed:Basic use of the kafka visualization client tool (Kafka Tool)

kafka_tool连接成功

Create Topic:test-topicand send a message

创建test_topic并发送一条消息

this timetest-topic There are 1 messages in

Consumer poll

The code is simple.

/**
 * @author: greenstone road
 */
public class MsgConsumer {

    private static final Logger LOGGER = ();

    public static void main(String[] args) {
        Properties props = new Properties();
        (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.118:9092");
        (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"");
        (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"");
        (ConsumerConfig.GROUP_ID_CONFIG, "test_group");
        (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // If there is a change in thekafkaThe current consumer's offset is not found in,then it is set to the oldest
        (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
        // Subscribe to threads
        (("test-topic"));
        ConsumerRecords<String, String> records = ((100));
        ("records count = {}", ());
        (record -> ("{} - {} - {}", (), (), ()));
        // ();
        ();
    }
}

Let's run it, the output log is as follows

poll没拉取到消息_日志

Why can't I get a message from the poll?

思考考

Let's adjust the code to loop through the poll

while (true) {
    ConsumerRecords<String, String> records = ((100));
    ("records count = {}", ());
    (record -> ("{} - {} - {}", (), (), ()));
}

Let's run it again, the output log is as follows

while循环后日志

The consumer polling process first determines if the current consumer is in theconsumer group in it, and if not, it will join the consumer group first, and in the process of joining theConsumerCoordinator will Rebalance this consumer group, all consumers in this consumer group will not be able to work during the whole process, and poll has configured a timeout (100 milliseconds), if the current consumer has not joined the consumer group within the timeout period, then the poll will not be able to fetch the data; according to the logs, the consumer has joined the consumer group normally by the third poll, so the data can be polled.

A lot of pals may possibly possibly have questions like this

Usually when I use it in my projects, I never feel such a problem, why?

There are several reasons for this

  1. The poll timeout time is set to be long enough for consumers to join the consumer group normally.
  2. Consumers are created with the startup of the project, and the survival cycle is the same as the project, so only in the first few polls, the consumer may not be able to pull data because the consumer has not been added to the consumer group, and once the consumer has been successfully added to the consumer group, then as long as there is data in the Topic, the poll will surely be able to pull data; in terms of the overall number of times, the percentage of abnormal cases (there is data in the Topic that can be pulled, but the poll can't be done) is very small, so small that it can be ignored. From the overall percentage of times, the percentage of abnormal cases (there is data in the Topic that can be pulled, but the poll can't get it) is very small, so small that it can be ignored.

So you don't feel this; but if there are certain scenarios, such as DataX reading data from kafka

Heterogenous Data Synchronization → Why should DataX support kafka?

Consumers have to keep building new ones, so the percentage of anomalies where polls don't reach the data will increase, and some mechanisms are needed to reduce their impact, such as a retry mechanism.

summarize

  1. Sample code:kafka-demo
  2. If you use docker a lot, we recommend setting up a personal mirror repository to solve the image pulling timeout problem.
  3. When a kakfa consumer polls, if the consumer is not in the consumer group, the consumer will join the consumer group first, then the poll may not be able to get data within the timeout period, you can increase the timeout period or retry mechanism to reduce the number of exceptions where the poll does not get data (the case where the poll does not get results because there is no data in the Topic that can be pulled does not count as an exception).