This article is excerpted from the book "Data Asset Management Core Technologies and Applications" published by Tsinghua University Press, authored by Zhang Yongqing et al.
Getting Data Bloodlines from Spark Execution Plans
Because data processing tasks will involve data conversion and processing, parsing blood from data tasks is also one of the channels to obtain data blood.Spark is one of the most commonly used technology components for data processing in big data, and can do both real-time and offline tasks.Spark generates an execution plan when executing each SQL statement, which is is similar to what many databases do, which is that the SQL statement generates an execution plan when it is executed. As shown in Figure 3-1-10 below, in Spark's official documentation link /docs/latest/#content, there is a clear mention that you can get the execution plan based on the EXPLAIN keyword, which is similar to the way many databases view the execution plan.
Figure 3-1-10
The process of generating the execution plan and processing the execution plan in the underlying layer of Spark is shown in Figure 3-1-11 below. This article is excerpted from the book "Data Asset Management Core Technologies and Applications" published by Tsinghua University Press, authored by Zhang Yongqing et al.
Figure 3-1-11
As can be seen in the figure, the
1, the execution of SQL statements or Data Frame, it will become an Unresolved Logical Plan, that is, there has not been any processing and analysis of the logical execution of the plan, only from the point of view of the SQL syntax to do some basic checks.
2, and then through the acquisition of Catalog data, the need to execute the SQL statement to do further analysis of the table name, column name verification, so as to generate a logical execution plan can be run directly.
3. but Spark underlying will have an optimizer to generate an optimal way to execute the operation, so as to generate an optimized best logic execution plan.
4. The finalized logical execution plan is converted to a physical execution plan, which is converted to the final code for execution.
Spark's execution plan is actually the process plan for data processing, which will parse the SQL statement or DataFrame and combine it with Catalog to generate the final code for data transformation and processing. So you can get the data conversion logic from Spark's execution plan, so as to parse the bloodline of the data. But the execution plan of Spark are automatically processed inside the Spark bottom layer, how to get the information of the execution plan of each Spark task? In fact, there is a set of Listener architecture design in the bottom layer of Spark, you can use Spark Listener to get a lot of data information about the execution of the bottom layer of spark.
In the spark source code, a trait (Java-like interface) is provided in the form of Scala to act as a listener for the execution of tasks such as Spark SQL. The following two methods are provided in Table 3-1-2.
Table 3-1-2
method name |
descriptive |
def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit |
The method called on successful execution, which includes the execution plan parameter, where the execution plan can be either a logical or physical plan |
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit |
The method called when execution fails, which also includes the execution plan parameter, where the execution plan can be either a logical or physical plan |
Therefore, you can borrow QueryExecutionListener to actively allow Spark to push the execution plan information to its own system or database when executing tasks, and then do further parsing, as shown in Figure 3-1-12 below. This article is excerpted from the book "Data Asset Management Core Technologies and Applications" published by Tsinghua University Press, authored by Zhang Yongqing et al.
Figure 3-1-12
import import import import case class PlanExecutionListener(sparkSession: SparkSession) extends QueryExecutionListener with Logging{ override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) { // When execution succeeds, call the method that parses the execution plan planParser(qe) } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) { } private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = { try body catch { case NonFatal(e) => val ctx = logError(s"Unexpected error occurred during lineage processing for application: ${} #${}", e) } } def planParser(qe: QueryExecution): Unit = { logInfo("----------- start to get spark analyzed LogicPlan--------") //Parses the execution plan and sends the execution plan data to its own system or database. ...... } }
The above code implements the QueryExecutionListener This trait in the onSuccess and onFailure these two methods, only in the onSuccess, you need to get the data of the execution plan, because only onSuccess when the bloodline is valid.
After implementing a custom QueryExecutionListener, you can register your own implementation of PlanExecutionListener into a Spark session by coming to the listenerManager, which is the manager of the Listener in Spark.
When the execution plan is obtained, it needs to be combined with Catalog again to further parse the bloodline data, as shown in Figure 3-1-13 below
Figure 3-1-13.
Spark in the common execution plan implementation class shown in the following table 3-1-3, to obtain the data lineage, is the need to resolve the lineage from the following these execution plans. This article is excerpted from the book "Data Asset Management Core Technologies and Applications" published by Tsinghua University Press, authored by Zhang Yongqing et al.
Table 3-1-3
Execution Plan Implementation Class |
descriptive |
Typically used to resolve field-level associations |
|
The execution plan for the Hive table association relationship, which is generally used for SQL execution where there is an associated query will appear. |
|
Generally the execution plan is generated when executing insert into SQL statement, such as insert into xxx_table(colum1,column2) values("4", "zhangsan") |
|
.InsertIntoHadoopFsRelationCommand |
Typically used to perform tasks like sparkSession .read .table("xx_source_table ") .limit(10) .write .mode() The execution plan generated by .insertInto("xx_target_table"). |
. CreateHiveTableAsSelectCommand |
Generally the execution plan is generated when executing the SQL statement create table xxx_table as, for example create table xx_target_table as select * from xx_source_table |
.CreateDataSourceTableAsSelectCommand |
Typically used to perform a sparkSession-like .read .table("xx_source_table") .limit(10) .write .mode() .saveAsTable("xx_target_table") generated execution plan. |
.InsertIntoDataSourceCommand |
Generally used to write SQL query results to a table, such as insert into xxx_target_table select * from xxx_source_table |
The following is based on
.InsertIntoHadoopFsRelationCommand as an example of the spark execution plan data, the following data has been converted to the original execution plan for the json format data, easy to do the show.
................. For more information, please refer to the book "Data Asset Management Core Technology and Application" published by Tsinghua University Press, authored by Zhang Yongqing et al.