This article was written by Li Jingwei
More technical exchanges, job search opportunities, welcome to pay attention to theByteDance Data PlatformWeChat Public,Reply [1]Enter the official communication group
contexts
With the growing demand for streaming tasks within Jitterbug Group, Flink SQL has been massively used internally in multiple directions as a low-cost means of access. Currently, the size of streaming SQL tasks has exceeded 30,000, and task resource usage and allocation has reached a million CORE.
In the context of reducing costs and increasing efficiency, in order to solve the problem of resource constraints and at the same time meet the business demand for higher performance, the Streaming Computing team has carried out in-depth optimization of FlinkSQL, and this article will focus on this practice, explaining the main optimization ideas in detail.
Engine Optimization
Query Optimization
View Reuse
In streaming SQL, it is common to put common computational logic in a view to increase the readability of the SQL code. In this case, view is just a logical concept: there is no real stored view to correspond to in the underlying implementation.
As shown in the following figure, scenario 1 indicates that there are multiple sink tables in the task, and the view is the logic of the window aggregation operation. Scenario 2 shows that the task needs to union two streams, and the view contains the logic of the normal aggregation operation.
In both scenarios, the user defines a generic view to perform the computation. Because different branches downstream query the view differently, the computation logic in the view is duplicated in different operators, resulting in duplicated resource overhead. The question is: why is the view not reused?
In Calcite's original logic, Queries contained in a view are immediately transformed into a Relational Expression Tree. If more than one Query accesses the same view, you get multiple RelNode Tree's with identical properties, but belonging to different Java objects, so all subsequent optimizations are based on the different subtree objects, and can't be recombined into the same tree.
- Multi-sink
In the multi-sink scenario, during the logical plan generation phase, the view is converted into multiple RelNode Tree by Calcite, and in the subsequent subgraph partitioning of the optimizer, these RelNode Tree will not be partitioned into the same subgraph, which results in the view not being reused.
As you can see, you need to start with Calcite and Flink to solve the problem. In Calcite's SqlToRel Convert, instead of immediately converting the Query in the view into the corresponding RelNode Tree, it should return a LogicalTableScan containing the corresponding Sql CatalogView Table.
In Flink, the CatalogView implementation needs to store the LogicalTableScan object so that downstream nodes all refer to the same CatalogView. before optimizing, the view in LogicalTableScan is expanded into a RelNode Tree so that downstream nodes can refer to the same RelNode Tree objects.
- Union all
In a Union all scenario, in order to reuse the view, a virtual sink node can be added behind the view to convert the Union all scenario into a multi-sink scenario. This allows the view not to be expanded into a RelNode Tree earlier in the logical plan phase, and the union can refer to the same View objects. Virtual sink nodes are deleted after subgraph segmentation.
From the above two scenarios, it can be seen that after the view reuse optimization, the computation logic corresponding to the view only needs to be computed once, and the overall CPU gain is 20%.
Remove Redundant Streaming Shuffle
Remove Redundant Streaming Shuffle removes unnecessary data distribution overhead in streaming scenarios. In batch scenarios, the shuffle operation has the performance overhead of dropping a disk, which has been optimized in the community. In a streaming scenario, the shuffle operation has serialization and network transfer overheads.
As shown in the example below, the sort and aggregate calculations are used to calculate the average of the top 5 prices of products in different categories. Before sorting and aggregation, the ids are hash'd, which means that both operators have the same hash key. after the data is hash'd by the rank operator, there is no need for a second hash, which means that the second shuffle is redundant.
The shuffle is created during the generation of the physical plan. The following figure shows how the Sql optimizer converts a SqlNode from a logical node to a physical node, in which shuffle is also known as exchange.The conversion process is carried out by rules, in which each logical node is traversed to determine whether the current node satisfies the conversion rules, and if there is a situation where it doesn't satisfy the conversion rules, an additional AbstractConvert.
In the rules for generating Exchange, it will determine whether the data distribution characteristics of the current node satisfy the requirements, and if not, it will add Exchange nodes upstream of the node to satisfy the data distribution characteristics. Finally, PhysicalExchange will be converted to hashShuffle for data distribution.
How to remove the redundant Streaming Shuffle?For this problem, the main idea is to refer to Batch's optimization of Shuffle. In the process of rule transformation, not only to consider the node itself, but also to consider whether the characteristics of the input node meet the demand, throwing the problem upward.
The implementation of the rule judgment method for Physical RelNode is divided into the following two cases:
- For nodes that are not characterized by a data distribution themselves (e.g., Calc and Correlate Node), determining whether they can satisfy the requirements of a particular data distribution is simply a matter of checking whether or not they contain a hash key in their own input.
-
For nodes that have their own data distribution characteristics (e.g., Aggregate and Rank nodes), it is necessary to verify that their data distribution characteristics satisfy the given distribution requirements.As shown in the figure below, the first step is to check whether the aggregate node satisfies the data distribution characteristics, which requires checking whether its input, i.e., the rank node, satisfies the requirements. If the rank node does not meet the requirement, then we need to add an exchange node upstream of it. After the addition, the rank algorithm satisfies the data distribution characteristics. Since rank and aggregate have the same hashkey, aggreagte is also satisfied.
This method provides the Volcano model with a better and less costly execution plan. The volcano model ultimately chooses the execution plan that removes the redundant Exchange. After removing the redundant streaming shuffle, the hash connections in the rank and agg operators have disappeared and are chained together, resulting in an overall CPU gain of 24%. This also opens up the possibility of optimizing the MultipleInput operator in a Streaming scenario.
Query Execution Optimization
Streaming MultipleInput Operator
Based on Remove Streaming Shuffle, the shuffle can be optimized deeper in join+join, join+agg, join+union while optimizing for redundant hash shuffle.
As shown in the following figure, because agg1 hash key and Join left key are the same; agg2 hash key and Join right key are the same, so the hash before Join can be changed to forward.
The current OperatorChain strategy does not support Chain of multi-input operators and cannot avoid serialization, deserialization and possible network overhead due to redundant shuffle. Therefore, the streaming computing team uses the MultipleInput mechanism to optimize the operator upstream and downstream of multiple Inputs by merging them into MultipleInutOperator in the Streaming scenario.
Specifically, the optimization went through the following steps:
-
First, the MultipleInputExecNode is constructed at the Planner level.
- MultipleInputExecNode is derived from ExecNodeDAG after logical physical plan when the plan is converted to ExecNode DAG. After obtaining the ExecNodeDAG, the topological ordering of the graph is obtained by first performing a breadth-first search from the root node. The construction of the MultipleInputExecNode is done in the Covert ExecNode DAG loop and after completing this sequence of operations, it is constructed in the ExecNode Graph.
-
In generating StreamMultipleInputExecNode empress,(indicates passive-voice clauses) translate be all right StreamMultipleInput transformation。
- The transformation contains some information about the creation of the MultipleInput Operator, and the sub-op information is stored through the TableOperatorWrapper.
-
Generate a Job Graph. which requires the following 2 conditions to be met:
- StreamConfig needs to be compatible with Multiple Input from two Input's TypeSerializer1,2 to TypeSerializer[], which is mainly used for state/key data transfer.
- Stream Graph can add MultipleInputOperator nodes. The method addMultipleInputOperator adds the properties corresponding to the Transformation to the vertex to form a node in the Stream Graph.
The runtime implements StreamingMultipleInputOperator and needs to take into account the creation of operators, data handling of operators, state, Timer&&Watermark, Barrier, Checkpoint and so on.
-
Operator initialization:
- Create not only the StreamingMultipleInputOperator, but also the corresponding sub-op;
- The sub-op is essentially an Abstract StreamOperator with sub-op id = op id + index;
- Create each sub-op object in createAllOperator and construct the inputs and outputs of the DAG.
-
ProcessElement :
- Ensure that the key is passed during the processing of the data.
-
State
- MultipleInputStreamOperato and sub-op sharestate handler;
- Create a new API stateNameContext to resolve state name conflicts.
-
Timer && Watermark
- MultipleInputStreamOperatorcap (a poem)sub-op share (joys, benefits, privileges etc) with others timeServiceManager;
- Create a new api TimerNameContext to resolve state name conflicts;
- The timeServiceManager manages timers at a sub-op granularity;
- Use Combindedwatermark to ensure Watermark alignment.
-
barrier:There is no need to think too much about this, the MultipleInput Operator has no data in the buffer internally, so checkpointing in topological order will not lose data. However, it should be noted that the prepareSnapshotPreBarrier needs to be propagated from the MultipleInputStreamOperator to all suboperators.
After optimization, agg+join operations will be merged into the MultipleInput algorithm, which will result in a 10% Cpu gain, as well as solving the problem of tasks failing to start due to insufficient network memory
Optimization of Long Sliding Windows
- Long sliding windows and their underlying implementation logic
In Flink SQL, a long sliding window is written as Hop(table, slide, size). Where size denotes the size of the window and slide denotes the step size of the window. In a sliding window, if the step size is smaller than the window size, then there will be elements belonging to different windows.
In the sliding window calculation, if the window time period is long, when calculating the uv of 7 days, 30 days and other time periods and carrying out the operation of de-weighting in high traffic scenarios, there will be a problem that the data delay in the calculation is particularly serious, or even the data can not be pushed, and even if the increase in resources can not solve this problem.
After analyzing the underlying logic of the sliding window, we can see that the main performance bottleneck of the sliding window computation is the merging operation of the pane, which is the smallest unit of the window computation. pane is the greatest common divisor of the window size and the step size, and most of the time, the size of the pane is 1. Every time the sliding window triggers the computation, it needs to merge all the pane data under the current window again. Each time a sliding window triggers a calculation, all the panes under the current window need to be merged again. Because of the large number of panes in a long window, the performance overhead is very high.
- Optimization Ideas for Long Sliding Windows
The main optimization idea for this is to trade space for time:
- Define the global state in the window operator that stores the results of the current window calculation;
- New retractMerge method in the aggregate function to remove the data of the crossed out window when the window is slid back;
- Merge the data of the added window when triggering the next calculation.
As shown in the figure below, when the window is slid back 3 panes, the results of pane 1-pane 3 are removed and the results of pane 11-pane 13 are merged in. In total, 6 panes need to be calculated, and the calculation of 4 panes is optimized.
Therefore, the optimization becomes more effective as the ratio of the window size to the sliding step size gets larger. The optimization resulted in an overall CPU gain of 60%.
Data processing (Format side)
Native Json Format
Currently, there are about 13,000 tasks that use Json Format within the Jitterbug group of companies, taking up nearly 700,000 cores, and if we make a conservative estimate of 5%, there are about 35,000 cores used for deserialization of Json on the line, so there is a lot of room for optimization in this part.
The following diagram shows the main flow of data being read from a message queue (MQ) and eventually passed to downstream operators. Two of the major overheads are Json deserialization and serializing GeneralRowData to bytes.
For the two important resource consumption mentioned above, optimization is carried out in the following two main areas:
-
For Json deserialization overheadWe use the c++ json parsing library, which supports vectorized programming, and choose sonic-cpp, which is developed in-house, to improve performance.
-
For the overhead of serializing to binaryRowDataUse the native methods to directly produce the binary representation needed by the BinaryRowData, and then use the BinaryRowData to point to that portion of the data, thus eliminating the corresponding serialization overhead.
In the test set, native Json was able to achieve a CPU gain of 57%.
Optimization practices
To ensure that the engine optimizations deliver real optimization results to the business side, the Streaming Computing team has done a lot of work internally to ensure that the optimizations are stable and live, as described in more detail below.
- tool layer
As shown in the framework diagram above, the bottom layer is the tool layer with the following five capabilities:
a. Support real-time reporting of SQL task meta-information;
b. Arithmetic Granularity Offline Number Warehouse, which provides task monitoring at the arithmetic granularity;
c. Commits granularity DAG compatibility check: It is possible to find out in advance which optimizations will affect the task state recovery;
d. Optimization items are prioritized in shades of grey: risk exposure can be limited;
e. Data accuracy link construction: ensures that go-live optimization items do not lead to data accuracy discovery issues.
Based on the above capabilities, the tool layer realizes task monitoring at the arithmetic granularity while ensuring task stability and data accuracy
- optimization layer
At the optimization item level, stock optimizations are promoted up or full, and many new optimizations are also explored and promoted.
- Engine & Platform Layer
Collaborate with the business side to drive inventory job governance at the engine & platform level. By configuring optimization items on the platform side, new jobs can directly apply certain optimization items. At the same time, calibrated optimizations will be turned on by default in the engine side.
After optimization, a final performance gain of 10w core+ was achieved.
future outlook
In the future, the Streaming Computing team will continue to optimize FlinkSQL and explore the best way to use state in Join. At the same time, we will also continue to explore the stream batch fusion native Engine and other directions.
click to jumpVolcano Engine Flink Streaming Computing Learn more