Happy Moment
A girl from elementary school messaged me today
Her: Are you graduating now?
Me: Uh, just graduated this year
She sent me a picture of a big orange cat in her arms
Her: My squint has grown so big, does it look good?
Me: Move the cat out of the way, it's in the way and I can't see it
She: Are you a sb? Get out!
I explained, "You're talking about cats.
But as soon as the message was sent, a red exclamation mark appeared with the message: the message was sent but rejected by the other party
kafka build
For simplicity reasons, based ondocker
Build akafka
node; for some reasons, the domestic Docker Hub image gas pedal are not available, the current more reliable practice is to build a personal image repository, can be referred to:Docker can not pull the image solution, I've tried it and it works, but would like to add a few points
-
Only the last mirror copy needs to be modified, nothing else needs to be changed
Supports configuration of multiple copies of an image at once
-
mirror copy
Format of the docker image copy command
skopeo copy docker:///namespace/mirror name:TAG docker://aliyun mirror address/namespace/mirror name:TAG
Let's take kafka as an example, and go to Docker Hub and search for it, and boy, did we find tens of thousands of
Let's refine the search terms a bit and search
wurstmeister/kafka
Tap in, and it's at the Docker Hub address:
/r/wurstmeister/kafka
Then its docker address is
docker:///wurstmeister/kafka
The other mirrors are found in a similar way, so the final copy command is similar to the following:
skopeo copy docker:///wurstmeister/kafka:latest docker:///qingshilu/wurstmeister_kafka:latest
If all goes well, then you can see our copied image in our AliCloud personal image repository
-
How to pull
Tap on the mirror name in your personal repository and you will see the
Operation Guide
Let's just focus on the first two steps to pull down the image
Once the image is acquired, you can build thekafka
up; because of the dependencezookeeper
Let's start it up.
docker run -d --name zookeeper-test -p 2181:2181 \
--env ZOO_MY_ID=1 \
-v zookeeper_vol:/data \
-v zookeeper_vol:/datalog \
-v zookeeper_vol:/logs \
/qingshilu/wurstmeister_zookeeper
then startkafka
docker run -d --name kafka-test -p 9092:9092 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.2.118:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.2.118 \
--env KAFKA_ADVERTISED_PORT=9092 \
--env KAFKA_LOG_DIRS=/kafka/logs \
-v kafka_vol:/kafka \
/qingshilu/wurstmeister_kafka
If nothing else, they're all up and running.
If there's an accident, don't panic, use thedocker log
Go check the logs and find the corresponding solution
# 1. First find the id of the container that failed to start up
docker ps -a
# 2. Use docker log to view the container startup logs
docker log container id
If you need to turn on thekafka
(used form a nominal expression)SASL
Certified and available for reference:Docker-Compose builds Kafka with SASL user password authentication rig up
Kafka Tool
Details can be viewed:Basic use of the kafka visualization client tool (Kafka Tool)
Create Topic:test-topic
and send a message
this timetest-topic
There are 1 messages in
Consumer poll
The code is simple.
/**
* @author: greenstone road
*/
public class MsgConsumer {
private static final Logger LOGGER = ();
public static void main(String[] args) {
Properties props = new Properties();
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.118:9092");
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"");
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"");
(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// If there is a change in thekafkaThe current consumer's offset is not found in,then it is set to the oldest
(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
// Subscribe to threads
(("test-topic"));
ConsumerRecords<String, String> records = ((100));
("records count = {}", ());
(record -> ("{} - {} - {}", (), (), ()));
// ();
();
}
}
Let's run it, the output log is as follows
Why can't I get a message from the poll?
Let's adjust the code to loop through the poll
while (true) {
ConsumerRecords<String, String> records = ((100));
("records count = {}", ());
(record -> ("{} - {} - {}", (), (), ()));
}
Let's run it again, the output log is as follows
The consumer polling process first determines if the current consumer is in theconsumer group
in it, and if not, it will join the consumer group first, and in the process of joining theConsumerCoordinator
will Rebalance this consumer group, all consumers in this consumer group will not be able to work during the whole process, and poll has configured a timeout (100 milliseconds
), if the current consumer has not joined the consumer group within the timeout period, then the poll will not be able to fetch the data; according to the logs, the consumer has joined the consumer group normally by the third poll, so the data can be polled.
A lot of pals may possibly possibly have questions like this
Usually when I use it in my projects, I never feel such a problem, why?
There are several reasons for this
- The poll timeout time is set to be long enough for consumers to join the consumer group normally.
- Consumers are created with the startup of the project, and the survival cycle is the same as the project, so only in the first few polls, the consumer may not be able to pull data because the consumer has not been added to the consumer group, and once the consumer has been successfully added to the consumer group, then as long as there is data in the Topic, the poll will surely be able to pull data; in terms of the overall number of times, the percentage of abnormal cases (there is data in the Topic that can be pulled, but the poll can't be done) is very small, so small that it can be ignored. From the overall percentage of times, the percentage of abnormal cases (there is data in the Topic that can be pulled, but the poll can't get it) is very small, so small that it can be ignored.
So you don't feel this; but if there are certain scenarios, such as DataX reading data from kafka
Heterogenous Data Synchronization → Why should DataX support kafka?
Consumers have to keep building new ones, so the percentage of anomalies where polls don't reach the data will increase, and some mechanisms are needed to reduce their impact, such as a retry mechanism.
summarize
- Sample code:kafka-demo
- If you use docker a lot, we recommend setting up a personal mirror repository to solve the image pulling timeout problem.
- When a kakfa consumer polls, if the consumer is not in the consumer group, the consumer will join the consumer group first, then the poll may not be able to get data within the timeout period, you can increase the timeout period or retry mechanism to reduce the number of exceptions where the poll does not get data (the case where the poll does not get results because there is no data in the Topic that can be pulled does not count as an exception).