Location>code7788 >text

A Kafka CRC Exception Causes Bloodshed

Popularity:570 ℃/2024-09-02 18:06:10

I. Overview of issues

The customer's production environment has suddenly and intermittently received Kafka CRC-related exceptions recently, with the following exceptions

Record batch for partition skywalking-traces-0 at offset 292107075 is invalid, cause: Record is corrupt (stored crc = 1016021496, compute crc = 1981017560)

Error reporting is not regular, it may not occur once in half a day, or it may occur 2 or 3 times in an hour.

This error will cause Kafka's Consumer hang dead, that is, unable to continue to consume subsequent messages, you can only manually restart the Consumer to continue, it is a very serious error, resulting in the production is not available!

Due to the company's heavy insurance customers, and there is such a serious shocking bug, a time when the company's senior leaders are highly concerned about this matter, every night at the evening meeting (are leaders) are also to be asked individually to solve the problem of progress, for a time, the pressure is great!

II. Analysis and positioning

2.1 Problem analysis

In the company's other project (referred to as customer A, we are currently dealing with referred to as customer B), this error is not strange, and the content of the error report is exactly the same, so the two will not be the same bug? The answer is no.

  • Client A: It does report a CRC exception, but it is stable and reproducible, as long as it occurs, no matter how many times the Consumer is restarted, and no matter how many Consumers consume it, it will report a CRC exception.
  • Client BThe problem only occurs once, if you restart the Consumer manually, the problem disappears, and if a different Consumer consumes it, the problem disappears, as if it were a flash in the pan, never to return, like a ghost!

Then this is not a routine question, it's a bit tricky

2.2, Kafka's own bugs ?

I have been active in the Kafka community for a long time, is the Kafka community Contributer, and in Ali's public cloud work for many years, but this error in the previous is not once encountered!

As far as I can remember, the CRC error hasn't been seen for a long time, and then I checked all the CRC-related issue lists in the community, and found that similar exceptions have only been seen in lower versions before 0.10, and our customer B's production environment is using 2.8.2, which is the last of the entire 2 versions, while Kafka 2.0 and later versions have rarely seen this exception

Kafka itself to calculate the CRC logic is very simple, only a few dozen lines of java code, in summary, I believe that Kafka itself out of the probability of bugs is relatively small, can not put significant effort on this piece of

2.3 Analysis of the tendency to report errors

Next we started analyzing the tendency of this exception to report errors, and after various troubleshooting, it has the following qualities:

  • Intermittent, probabilistic, uncertain; sometimes it may not occur once in half a day or 2 or 3 times in an hour
  • Cannot be reproduced; problematic messages, re-consumed, CRC anomaly disappears
  • B customer site has 8 sets of environments, now only one set of environments with this problem, compared to the other 7 sets of environments, a rough look at the disk model, CPU model, and other hardware equipment is to remain consistent
  • Look at the Broker in question, which is not uniform and does not land on the same host
  • Suspecting a memory stick error, checking the corresponding logs did not reveal any abnormal messages
  • Confirmed with the operating system students, we are now using our own OS, but the TCP protocol has not been changed, the use of the Linux native
  • Simulated network packet loss and packet error scenarios in the home environment without reproducing CRC anomalies

So far, everything seems to be fine, and after a cursory examination, no valuable clues were found. So we shifted our focus to "network" + "disk", two basic capabilities that seem unlikely to cause problems.

III. Network troubleshooting

3.1. Burying points

network troubleshooting, we have to perform TCP packet capture, that is, the execution of tcpdump command, but packet capture is not simply on the machine to perform a command so simple, the current deployment environment is three Broker, three Consumer, exceptions will be exposed in the Consumer, but a single Consumer may be built with multiple Broker connected to the

Therefore, the idea of capturing packets is to create a listener on all three brokers, and also create a listener on one of the consumers, and then observe whether the data sent out by TCP is consistent with the data received.

First, execute the dump command on the 3 Brokers. Since the probability of problems is not high, which may lead to very large network dump packets, the TCP packets will be split into files of 500M size in the command and only the 5 most recent files will be retained. the IPs of the 3 Brokers are as follows:

  • 10.0.0.70
  • 10.0.0.71
  • 10.0.0.72

And 3 Consumer IPs are:

  • 10.0.0.18
  • 10.0.0.19
  • 10.0.0.20

First execute the following command on 1 consumer:

sudo nuhup tcpdump -i vethfe2axxxx -nve host 10.0.0.70 or host 10.0.0.71 or host 10.0.0.72 -w broker_18.pcap -C 500 -W 5 -Z ccadmin &
sudo nuhup tcpdump -i vethfe2axxxx -nve host 10.0.0.70 or host 10.0.0.71 or host 10.0.0.72 -w broker_19.pcap -C 500 -W 5 -Z ccadmin &
sudo nuhup tcpdump -i vethfe2axxxx -nve host 10.0.0.70 or host 10.0.0.71 or host 10.0.0.72 -w broker_20.pcap -C 500 -W 5 -Z ccadmin &

Next perform the following on each of the 3 Brokers:

sudo nuhup tcpdump -i xxxxxxxxx1 -nve host 10.0.0.18 -w broker_18.pcap -C 500 -W 5 -Z ccadmin &
sudo nuhup tcpdump -i xxxxxxxxx2 -nve host 10.0.0.19 -w broker_19.pcap -C 500 -W 5 -Z ccadmin &
sudo nuhup tcpdump -i xxxxxxxxx3 -nve host 10.0.0.20 -w broker_20.pcap -C 500 -W 5 -Z ccadmin &

A brief explanation of what the command means:

  • -i vethfe2axxxx The name of the NIC is added at the end
  • -nve host What you need to add is the IP you need to listen to.
  • -w broker_18.pcap generates records into the file broker_18.pcap
  • -C 500 Generate a file every 500M.
  • -W 5 Retention of the last five documents generated
  • -Z ccadmin Specify the user name

3.2. Anomalous surfacing

After waiting in front of the screen for nearly 2 hours, we finally captured an anomaly message, so we downloaded the data without stopping and analyzed it. The first thing we saw wasWiresharkHelps to parse the generated pre and post messages

The total length of the message was 2,712 bytes, about 3K of data, but we were surprised to find that 15 bytes of that data were out of kilter

So far, the localization is probably a problem with the network transmission, but we still need to do more calibration work.WiresharkThe tool works well, but it's not foolproof; it can only parse Kafka's transport protocol, and it can't validate and restore the content

(We took 3 random messages of normal interactions in our sample and compared them, and found that the data before and after the network sent them were identical)

3.3, Kafka protocol analysis

Kafka's protocols are divided into transport protocols and storage protocols, and I am relatively familiar with them, so I started to directly parse the messages transmitted before and after the first Request

import ;
import ;

import ;
import ;

public class MyTest {
    public static void main(String[] args) {

        // 08-30 15:06:40.820028
        String hexString =
            "0e 1f 48 2d 7e 32 06 82 25 e9 a3 d9 08 00 45 00 " +
                "00 ab 3f b3 40 00 40 06 35 ed 62 02 00 55 62 02 " +
                "00 54 eb 2e 23 85 68 57 32 37 b5 08 3f b2 80 18 " +
                "7d 2c c5 4a 00 00 01 01 08 0a b7 d1 e6 5c 26 30 " +
                "b9 11 00 00 00 73 00 01 00 0c 11 85 6f bf 00 15 " +
                "62 72 6f 6b 65 72 2d 31 30 30 32 2d 66 65 74 63 " +
                "68 65 72 2d 30 00 00 00 03 ea 00 00 01 f4 00 00 " +
                "00 01 00 a0 00 00 00 72 52 11 e0 11 85 6f bf 02 " +
                "13 5f 5f 63 6f 6e 73 75 6d 65 72 5f 6f 66 66 73 " +
                "65 74 73 02 00 00 00 15 00 00 00 03 00 00 00 00 " +
                "13 36 67 3a 00 00 00 03 00 00 00 00 00 00 00 00 " +
                "00 10 00 00 00 00 01 01 00";
        // FetchRequestData(clusterId=null, replicaId=1002, maxWaitMs=500, minBytes=1, maxBytes=10485760,
        // isolationLevel=0, sessionId=1917981152, sessionEpoch=293957567,
        // topics=[FetchTopic(topic='__consumer_offsets', partitions=[FetchPartition(partition=21, currentLeaderEpoch=3,
        // fetchOffset=322332474, lastFetchedEpoch=3, logStartOffset=0, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')



        StringBuilder sb = new StringBuilder();
        char[] charArray = ();
        for (char c : charArray) {
            if (c != ' ') {
                (c);
            }
        }
        hexString = ();
        byte[] byteArray = (hexString);
        ByteBuffer byteBuffer = (byteArray);
        ("len is : " + ().length);
        (new byte[66]);
        ();

        RequestHeader header = (byteBuffer);
        (header);
        FetchRequest fetchRequest = (byteBuffer, (short) 12);
        (fetchRequest);
    }
}

The final output is as follows

len is : 185
RequestHeader(apiKey=FETCH, apiVersion=12, clientId=broker-1002-fetcher-0, correlationId=293957567)
FetchRequestData(clusterId=null, replicaId=1002, maxWaitMs=500, minBytes=1, maxBytes=10485760, isolationLevel=0, sessionId=1917981152, sessionEpoch=293957567, topics=[FetchTopic(topic='__consumer_offsets', partitions=[FetchPartition(partition=21, currentLeaderEpoch=3, fetchOffset=322332474, lastFetchedEpoch=3, logStartOffset=0, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')

and the content before and after the TCP transmission is the same, so the problem does not appear on the Request

Next, you need to do the Response binary protocol parsing, the code is as follows

import ;
import ;
import ;
import ;

import ;
import ;
import ;
import ;

public class MyTest2 {
    public static void main(String[] args) {

        08-30 15:06:41.853611
               String hexString =
                   "0e 1f 48 2d 7e 32 06 82 25 e9 a3 d9 08 00 45 00 " +
                       "00 ab 3f b3 40 00 40 06 35 ed 62 02 00 55 62 02 " +
                       "00 54 eb 2e 23 85 68 57 32 37 b5 08 3f b2 80 18 " +
                       "7d 2c c5 4a 00 00 01 01 08 0a b7 d1 e6 5c 26 30 " +
                       "b9 11 00 00 00 f9 11 85 6f c1 00 00 00 00 00 00 " +
                       "00 72 52 11 e0 02 13 5f 5f 63 6f 6e 73 75 6d 65 " +
                       "72 5f 6f 66 66 73 65 74 73 03 00 00 00 03 00 00 " +
                       "00 00 00 00 15 09 0c 9c 00 00 00 00 15 09 0c 9c " +
                       "00 00 00 00 00 00 00 00 00 ff ff ff ff 89 01 " +
        
                       "      00 00 00 00 15 09 0c 9c 00 00 00 7c 00 00 " +
                       "00 03 02 08 16 d8 10 00 00 00 00 00 00 00 00 01 " +
                       "91 a2 1b 8f 3c 00 00 01 91 a2 1b 8f 3c ff ff ff " +
                       "ff ff ff ff ff ff ff 00 00 00 00 00 00 00 01 92 " +
                       "01 00 00 00 56 00 01 00 0b 43 43 4d 2d 65 6e 63 " +
                       "72 79 70 74 00 16 43 53 53 2d 49 49 53 2d 53 69 " +
                       "74 75 61 74 69 6f 6e 51 75 65 72 79 00 00 00 00 " +
                       "30 00 03 00 00 00 00 00 00 00 00 ff ff ff ff 00 " +
                       "00 00 00 01 91 a2 1b 8f 3b 00 " +
        
        
        
                       "      00 00 00 00 15 00 00 00 00 00 00 13 36 67 " +
                       "47 00 00 00 00 13 36 67 47 00 00 00 00 00 00 00 " +
                       "00 00 ff ff ff ff 01 00 00 00 ";


//        String hexString =
//                "0e 1f 48 2d 7e 32 06 82 25 e9 a3 d9 08 00 45 00 " +
//                "00 ab 3f b3 40 00 40 06 35 ed 62 02 00 55 62 02 " +
//                "00 54 eb 2e 23 85 68 57 32 37 b5 08 3f b2 80 18 " +
//                "7d 2c c5 4a 00 00 01 01 08 0a b7 d1 e6 5c 26 30 " +
//                "b9 11 00 00 04 65 11 85 6f c0 00 00 00 00 00 00 " +
//                "00 72 52 11 e0 03 13 5f 5f 63 6f 6e 73 75 6d 65 " +
//                "72 5f 6f 66 66 73 65 74 73 02 00 00 00 15 00 00 " +
//                "00 00 00 00 13 36 67 3b 00 00 00 00 13 36 67 3b " +
//                "00 00 00 00 00 00 00 00 00 ff ff ff ff da 07 " +
//
//                "      00 00 00 00 13 36 67 3b 00 00 03 cd 00 00 " +
//                "00 03 02 5c 37 79 85 00 00 00 00 00 0b 00 00 01 " +
//                "91 a2 1b 8f 3c 00 00 01 91 a2 1b 8f 37 ff ff ff " +
//                "ff ff ff ff ff ff ff 00 00 00 00 00 00 00 0c 96 " +
//                "01 00 00 00 5a 00 01 00 0b 43 43 4d 2d 65 6e 63 " +
//                "72 79 70 74 00 16 43 53 53 2d 49 49 53 2d 53 69 " +
//                "74 75 61 74 69 6f 6e 51 75 65 72 79 00 00 00 00 " +
//                "30 00 03 00 00 00 00 00 00 00 00 ff ff ff ff 00 " +
//                "00 00 00 01 91 a2 1b 8f 3b 00 " +
//
//
//
//                "      00 00 00 00 15 00 00 00 00 00 00 13 36 67 " +
//                "47 00 00 00 00 13 36 67 47 00 00 00 00 00 00 00 " +
//                "00 00 ff ff ff ff 01 00 00 00 ";


        StringBuilder sb = new StringBuilder();
        char[] charArray = ();
        for (char c : charArray) {
            if (c != ' ') {
                (c);
            }
        }
        hexString = ();
        byte[] byteArray = (hexString);
        ByteBuffer byteBuffer = (byteArray);
        ("len is : " + ().length);
        (new byte[66]);
        ();


        ResponseHeader responseHeader = (byteBuffer, (short) 0);
        ("responseHeader is " + responseHeader);
        FetchResponse<MemoryRecords> fetchResponse = (byteBuffer, (short) 11);
        (fetchResponse);
        LinkedHashMap<TopicPartition, <MemoryRecords>> map = ();
        ("map size is : " + ());
        ("map is : " + map);
        for (<TopicPartition, <MemoryRecords>> entry : ()) {
            ();
            ();
            ();
            ();
            ("TP is : " + ());
            <MemoryRecords> value = ();
            MemoryRecords records = ();
            ().forEach(batch -> {
                ("isValid: " + ());
                ("crc : " + ());
                ("baseOffset : " + ());
            });
        }
    }
}

A key part of this is looking at the logic of the 2 CRCs:

  • One piece is obtained from the protocol body, i.e., the CRC content computed by the Broker, which is consistent before and after the comparison transfer
  • One piece is dynamically computed based on the message content. Before the message is transmitted, the dynamically computed CRC content can be aligned with its own storage, but after the message is received, the dynamically computed CRC deviates

The following figure shows an exception popped in a real production business log

We found that the calculated CRC is 2481280076, and then we dumped down the binary for protocol decoding, after executing the above code, we found that the result is the same as that of the production environment.

IV. Disk exfiltration

We subsequently compared the content sent out to the disk, and it was the same, so the bug was not caused by the disk.

V. Conclusion

Since the message with the problem has a diff before and after the TCP send, and the message received normally is consistent both before and after the TCP send, the

Positioned as a problem with the network.

The students of the network also think that this phenomenon does not meet the expectations, and have already begun to intervene in the investigation

Summarize: In fact, according to the experience of handling Kafka exceptions over the years, the first time this bug is obtained, it does not feel like Kafka's own problem, the key is that the exception is reported on the Broker side of Kafka, we need to continuously improve theclear oneselfability