Location>code7788 >text

Volcano Engine VeDI Data Technology Sharing: Two Steps to Reduce Costs and Improve Efficiency for Parquet

Popularity:937 ℃/2024-07-30 19:45:37

More technical exchanges, job search opportunities, welcome to pay attention to theByte jumping data platform WeChat public number, reply [1]Enter the official communication group

Author: Wang Enze, Xu Qing
Volcano Engine LAS Team

 

Volcano Engine Digital Intelligence Platform VeDI is a new generation of enterprise data intelligence platform launched by Volcano Engine. Based on years of "data-driven" practical experience of ByteDance data platform, VeDI provides enterprises with end-to-end digital intelligence products, scenario-based industry solutions, and professional consulting on digital intelligence transformation.

 

Parquet has been widely used and deeply optimized within ByteDance. Combined with the two application scenarios of small file merging and column-level TTL, we have achieved the optimization of storage cost under high query performance.

 

Parquet-based cost reduction and efficiency optimization and application has been exported to the public through the Lake Warehouse All-in-One LAS of VeDI, a Volcano Engine Digital Intelligence platform.

Use of Parquet at ByteHop

 

ByteDance Offline Warehouse uses Parquet format for data storage by default.Parquet, as an open source file format for columnar storage, is widely used in the big data domain, and it provides a series of features, such as high compression, high query performance, etc., which are very suitable for the big data domain.

 

Additionally, in terms of data security, it provides modular encryption features to protect data while also taking into account high query performance. In addition to some basic capabilities provided by the communityByteDance also carries out in-depth optimization and application based on Parquet format, including LocalSort/PreWhere and other functions, which further improve the storage and query performance of Parquet. In terms of data security, ByteDance has also built a transparent encryption system based on Parquet, which encrypts and protects the underlying data without affecting the normal use of users.

 

In the actual production process, with the continuous growth of massive data, some problems have been encountered. One of the more typical ones is the small file problem and the storage cost problem. The small file problem refers to the existence of a large number of small files in the storage system. Since the offline storage of ByteDance adopts HDFS, the existence of a large number of small files will seriously affect the stability of the HDFS cluster and the efficiency of data access.

 

After analyzing, we found that most of the data in HDFS comes from Hive, so the target of governance will mainly focus on Hive data. In terms of storage cost, massive data brings high storage cost, and how to safely and efficiently control the storage cost is also a major difficulty in cost reduction and efficiency.

Small file merging

 

First describing some of the technical practices in the governance of small documents.It mainly covers the causes of the small document problem and the technical solutions to the small document problem.

How the small file problem arose

The small file problem may arise due to the data source itself, for example, some streaming tasks naturally produce some small files according to a certain time period.

 

It is also common for users to use distributed engines such as Spark to process data with too much concurrency, which can result in a large number of small files, which can be further exacerbated by dynamic partitioning if used at the same time.

 

Similar to the example shown in the figure below, where the final number of files produced is the number of concurrencies multiplied by the number of partitions, a single job can easily produce thousands or even tens of thousands of small files.

How to solve the small file problem

For the small file problem mentioned above, there are already some common solutions, such as using repartition to control the concurrency of the output; or use distribute by to control the distribution of data in the form of only one file per partition; in some cases even need to split the job into two separate processing to deal with different data scenarios.

 

All of these approaches are in general not flexible enough, are more intrusive to the business, and often involve tedious reference tuning, which affects work efficiency. For this reason, we propose a set of automated, declarative small file merging program. Users only need to enable small file merging through the parameter, and set the target file size, you can let the job automatically output the appropriate file size.

 

In addition to being simple and straightforward, this approach is also very efficient in terms of merging, which will be expanded in detail later in the principles. In addition, the program supports both static and dynamic partitions well.

 

The following is a description of how this feature is implemented. Assuming that the small file issue is not taken into account, for a normal Spark ETL job, the data is computed and then written to each partition directory of the target table, and then Spark triggers the update of the Hive table meta information, at which point the data is officially visible to the downstream.

 

And when a user turns on small file merge, we insert the small file merge operation before updating the metadata, that is, before the data is visible. Specifically, it checks whether the files under each partition meet the size requirement. If the files are found to be too small, small file merge will be triggered under this partition.

 

For those partitions that already fulfill the requirements, they are skipped without any operation. This on-demand merge is one reason for the high efficiency of the merge, and another reason is that we use fast merge technology.

The core of small file merging is how to merge multiple Parquet files under one partition into one, due to the special encoding rules of Parquet format, the file is divided into multiple functional sub-modules, we can't merge two Parquet files by splicing them together head to tail.

 

Conventional practice is to use Spark to read these small files, extract a line of records in the file, and then write a new file. In this process of reading and writing, it involves a lot of compression and decompression, encoding and decoding operations, which consume a lot of computing resources.

In order to increase the speed of merging, we adopt a fast merging method, which draws on the merge tool provided by the Parquet community to quickly merge multiple Parquet files into a single one, and the following is a description of its implementation.

 

Parquet files have a lot of content inside them, such as Footers, RowGroups, and so on. These can be categorized into 2 types, one is the actual data that is compressed and encoded, and the other is the metadata that records how the data is encoded and arranged.The basic idea of Fast Merge is to directly copy the original binary Data corresponding to the actual data (skipping the coding and decoding process), and then construct new meta-information based on the position of the data in the new file.

 

The meta-information construction process is very fast, so the overall overhead is similar to copying the entire file. It is worth noting that this merge does not merge RowGroups, so there is no significant improvement in compression or query performance, but it does greatly improve merge efficiency, and the reduction in the number of files ultimately reduces the pressure on the HDFS cluster.

 

After the performance test, the fast merge is about 14 times better than the normal merge. According to the actual operation of the online tasks, the average operation time of the job increased by only about 3.5% after the fast small file merge was enabled, so we can see that the impact on the business is very small.

In addition, in real production environments, Parquet files are encrypted for data security, and in order to ensure high query performance, the encrypted storage uses modular encryption, that is, each module in the file is encrypted separately.

 

This encryption preserves the basic structure of Parquet files, thus preserving the ability of Parquet to perform high-performance queries. However, it also brings new challenges during small file merging.

 

We can't copy the binary data directly as in the previous model, because the data of each file is encrypted based on different keys, and the key information is stored in the Footer of each file, and after copying the binary module directly to the target file, it can't be decrypted with the unified key in the new file.

 

To do this, we need to add decryption and re-encryption operations to the copy binary module, using the key from the original file to decrypt it and then encrypting it with the key from the new file, in addition to the original fast merge. The overall process is still based on copying the binary data, skipping redundant operations such as encoding and decoding to achieve a fast merge.

The above describes how to merge small files at the same time as the data output, in reality, there are a large number of historical small files on the HDFS cluster, for which we provide a stock small file merge tool to deal with, and its method of use is very simple: the user submits a SQL, and in the SQL, the user formulates the merged table, partition, and merge the target file size. after the SQL is submitted, the system starts a Spark job. After the SQL is submitted, the system will start a Spark job, and then combine the tools and processes mentioned just now to quickly merge the stock data.

Summary: We provide corresponding small file merging capabilities in both incremental and stock scenarios, which provides comprehensive governance of small files in a simple and efficient way, improves the health and stability of the entire cluster, and ultimately reduces machine costs and human operation and maintenance costs effectively.

Column level TTL

The above section describes the relevant practices in solving the small file problem. The next section describes the Parquet format's use in another ByteHopping practice of reducing costs and increasing efficiency - thecolumn-level TTL-related content.

Background on the emergence of column-level TTL

  1. With the development of business, the storage cost of massive data has gradually become a major pain point of offline warehouse. At present, the offline warehouse cleanup storage is only a partition-level row-level TTL program, similar to the use of alter table drop partition DDL to complete the overall deletion of partition data. For tables with a large time span of aggregation needs, you need to keep a long history of partitioning, and a lot of detail data in these historical partitions will not be used in the aggregation task, that is, there are a lot of low-frequency access fields in the historical partition. If you want to delete these fields that are no longer in use, the existing way is to read out the data through engines such as Spark and set the fields that need to be deleted to NULL in the overwrite way to complete.There are two drawbacks to this approach: the computational resource overhead of overwriting huge amounts of data is high;

 

  1. For large wide tables with many fields, users need to list all the fields in select and empty the fields that need to be deleted one by one, so the TTL task has high operation and maintenance costs.

Lightweight column-level TTL scheme

Aiming at the above pain points of business, combined with the characteristics of Parquet columnar storage.A lightweight column-level TTL scheme is proposed as shown below.

This solution directly copies the binary data of the columns to be retained in each RowGroup according to the Column chunck, skipping the coding and decoding process. For example, let's say the table has columns 1, 2, and 3, and now we need to delete the data in column 2.

 

The first thing to do is to construct a new schema, delete the column that needs TTL (column 2) from the original schema and use it as the schema of the new file, and then copy the data of column 1 and column 3 according to the Column Chunk from the original file to the new file. In column level TTL, because of deleting some column data, the new file size will become smaller, and it is easy to have the small file problem, so we support to merge the data of original multiple files into the same file.

 

This column-level TTL approach achieves a 14x+ speedup compared to the insert overwrite override approach. As mentioned earlier, insert overwrite requires all columns to be enumerated in the select. In order to make it easier for the business side to use this column-level TTL functionality, we have defined a new syntax to support column-level TTL: alter table ${} partition(${part_name}) drop columns(xxx ).

 

Users only need to specify the library table partitions that need to perform column-level TTL and the columns that need to be deleted, instead of listing the other columns that don't need to be deleted, and then submit this SQL to the data engine to trigger the execution of the column-level TTL task.

 

There is one more issue involved in the realization of column-level TTL functionality to the ground.It's how to efficiently discover which columns under which partitions are capable of column-level TTL.

 

The LAS team has developed a column-level bloodline analysis tool that supports rapid analysis of table history partition queries, and then automatically recommends columns that can perform column-level TTL and the corresponding TTL time, for example, if a certain table column A had basically no user queries 90 days ago, then column A can be TTL in the history partition 90 days ago; Column B 120 For example, if a table column A has no user query 90 days ago, then column A in the history partition 90 days ago can be TTL; Column B 120 days ago has no user query, so it can be TTL at column level.

 

Column-level TTL is mainly used in ByteDance for historical low-optimization data cleanup, cleanup of large JSON, large MAP type fields, or data cleanup of detailed logs. This feature has already cleaned up a large amount of useless historical data for the company and freed up a large storage space.

 

Combined with ByteDance's own experience, Volcano Engine Numerical Intelligence Platform VeDI continues to optimize and refactor Parquet, an open source technology, to further improve performance and take into account storage costs. In the future, VeDI will continue to provide more users with high-quality data technology services and help enterprises realize digital transformation by virtue of its excellent technical capabilities and rich industry experience.

 

click to jumpvolcano engineVeDI Learn more