Location>code7788 >text

Hive Source Code Analysis

Popularity:664 ℃/2024-08-29 23:17:44

1. Overview

Apache Hive is a data warehousing tool built on top of Hadoop, which provides an SQL-like query language that enables users to process and analyze large-scale data with simple SQL statements. In this paper, we will analyze the source code of Apache Hive in depth and explore its key components and workings to better understand its role in big data processing.

2. Content

Before we start the source code analysis, let's understand the overall architecture of Hive.Hive uses a table structure similar to traditional databases, but the underlying data is stored in the Hadoop Distributed File System (HDFS). Its architecture mainly includes metadata storage, query compiler, execution engine and other key components. As shown in the figure.

2.1 Understanding the Overall Architectural Components of Hive

Understanding the overall architecture of Hive is the basis for analyzing the Hive source code in depth. By carefully reading the official Hive documentation and deeply studying the source code structure, we can reveal the basic composition of the Hive system. Overall, the Hive architecture consists of key components such as user interfaces, metadata storage, query processing, and data storage and computation. These components collaborate with each other to build a powerful and flexible big data processing framework that allows users to easily manipulate huge data sets distributed in HDFS in the way of SQL.

1.User Interface

Hive provides three main user interfaces, namely, command line, JDBC/ODBC client and Web UI interface. Among them, the command line is the most commonly used, providing users with a convenient command line interface, the JDBC/ODBC client is Hive's Java client, which connects to the Hive Server in a way similar to the traditional database JDBC, and the Web UI interface is accessed through a browser, providing a more intuitive graphical operation interface.

2. Metadata storage

Hive's metadata (MetaStore) is stored in a relational database, such as MySQL or Derby.This metadata includes information such as the name of the table, the table's column and partition attributes, the table's characteristics (e.g., whether or not it is an external table), and the directory in which the table's data resides.

3. Query processing flow

Hive's query processing includes modules such as interpreter, compiler, and optimizer, which are responsible for lexical analysis, syntax analysis, compilation, optimization, and generation of query plans for Hive SQL query statements. The generated query plans are stored in Hadoop Distributed File System (HDFS) and are subsequently executed by scheduling MapReduce tasks.

4. Data storage and computation

Hive's data is stored in HDFS, while most of the query and computation tasks are performed by MapReduce. It is worth noting that some queries may not generate MapReduce tasks; for example, for a query like SELECT * FROM stu, Hive is able to perform an efficient read operation. In this case, Hive directly scans the files in the storage directory associated with table stu and then outputs the query results. However, most computational operations involving data are realized through MapReduce.
Through the above architectural analysis, we are able to have a clearer understanding of Hive's workflow during data processing, including the selection of user interfaces, the management of metadata, the processing of query statements, and the way data is stored and computed. This helps developers better understand and optimize the performance and scalability of Hive in a big data environment.

2.2 Deep analysis of the Hive metadata storage mechanism

Regarding data storage, Hive provides great flexibility by not setting up specialized data storage formats or indexes. Users are able to freely organize the tables in Hive, and only need to specify the column separator and row separator of the data when creating the table, and Hive can parse the data. All data is stored in the Hadoop Distributed File System (HDFS), and the storage structure mainly consists of databases, files, tables, and views.Hive's data model covers Table, External Table, Partition, and Bucket.By default, Hive supports loading text files directly. Hive supports loading text files directly by default, and also provides support for various compressed files, such as GZIP, ZLIB, SNAPPY and so on.
In addition, Hive stores metadata in a relational database management system (RDBMS) to which users can connect in three different schemas. This design gives Hive the flexibility to integrate with a wide range of relational databases, providing scalability and customizability of metadata management.

1. Embedded mode

This schema connects to a local embedded database, Derby, which is typically used for unit testing. The embedded Derby database can only access one datafile at a time, which means it does not support multi-session connections. This configuration is suitable for lightweight testing scenarios where each test can be run in a relatively independent database environment, ensuring isolation and repeatability between tests. As shown in the figure.

2. Local mode

This pattern essentially switches Hive's default metadata storage medium from the built-in Derby database to a MySQL database. With this configuration, no matter how or where Hive is started, as long as they are connected to the same Hive service, all nodes have access to consistent metadata information, enabling the sharing of metadata. As shown in the figure.

3. Remote mode

In remote mode, the MetaStore service runs on its own standalone JVM, not in the HiveServer's JVM. Other processes that want to communicate with the MetaStore server can use the Thrift protocol to connect to the MetaStore service for metabase access. In production environments, it is highly recommended to configure the Hive MetaStore for remote mode. In this configuration, other software that depends on Hive is able to access Hive through the MetaStore.Since this mode also completely shields the database layer, it brings better manageability and security. As shown in the figure.

Note that in remote mode, we need to configure parameters that explicitly specify the IP and port of the machine on which the MetaStore service is running. It is also necessary to manually start the Metastore service separately.

2.3 In-depth analysis of how Hive works

Deeply analyzing the working principle of Hive is not only a thorough understanding of its internal mechanism, but also an exploration of deep knowledge of big data processing paradigm. This section will lead readers from the user query to the underlying data storage, from the metadata management to the distributed computing engine, in-depth analysis of Hive's working principle of technical insider.
The internal core components of Hive mainly contain: metadata store, query compiler, execution engine and data storage and computation. The metadata store is responsible for managing information about table structures, partition information and other metadata, while the query compiler translates Hive SQL statements into MapReduce tasks, which are ultimately scheduled and executed by the execution engine on the Hadoop cluster.

(metadata storage)

Responsible for storing and managing Hive's metadata, it uses a relational database to persist metadata information, which includes key information about table structure, partitioning information, and more.

2. Interpreters and compilers

This part is responsible for converting the SQL statements submitted by the user into a syntax tree and then generating Job chains in the form of a DAG (Directed Acyclic Graph) to form a logical plan. This process ensures the legitimacy of SQL queries and the feasibility of optimization.

3. Optimizer

Hive's optimizer provides rule-based optimization, which includes column filtering, row filtering, predicate push down, and different Join methods. Column filtering is done by removing unwanted columns from the query, row filtering is done in the TableScan phase, using Partition information to read only eligible Partitions. predicate underpush helps to reduce the amount of data to be processed subsequently. For Join, Hive supports Map-side Join, Shuffle Join, Sort Merge Join and other ways to adapt to different data distribution and processing needs.

4. Actuator

The executor is responsible for converting the optimized DAG into MapReduce tasks and executing all the Jobs in it in a sequential manner.In the absence of dependencies, the executor executes in a concurrent manner to improve the overall execution efficiency. This phase transforms the logical plan into actual MapReduce tasks and performs the corresponding data processing operations.
These core components work together to form the complete process of Hive's data processing in a big data environment. From metadata management to SQL query parsing and optimization to final execution, this series of steps clearly demonstrates the power of Hive in a distributed environment. As shown in the figure.

3. In-depth analysis of the working mechanism of Hive Driver

When reading a framework's source code, it's common practice to start at the entry point of the program, focusing only on the core parts and skipping secondary details like checksums and exceptions. When diving into Hive's source code, our starting point is often the execution script.
Before we dive into analyzing the Hive execution scripts, let's first take a look at the Hive source code directory structure, as shown in the figure.

 

 

First, we can look at the code to determine the client mode of Hive, whether it is Cli or Beeline. the code is as follows:

# Check if the SERVICE variable is empty
if [ "$SERVICE" = "" ] ; then
  # If SERVICE is empty, then check if the HELP variable is"_help"
  if [ "$HELP" = "_help" ] ; then
    # If HELP is"_help",commander-in-chief (military)SERVICEset to"help"
    SERVICE="help"
  else
    # If HELP is not"_help",commander-in-chief (military)SERVICEset to"cli"
    SERVICE="cli"
  fi
fi

# Check if the SERVICE variable is"cli"both (... and...)USE_BEELINE_FOR_HIVE_CLIWhether the variable is"true"
if [[ "$SERVICE" == "cli" && "$USE_BEELINE_FOR_HIVE_CLI" == "true" ]] ; then
  # If both conditions are met, set SERVICE to"beeline"
  SERVICE="beeline"
fi

In the bin/hive directory, there exists a script named that contains the logical implementation of starting a Hive Cli or Beeline. The code is as follows:

The following is the Chinese commented version of the given Hive script:

shell
# Set the THISSERVICE variable to"cli"
THISSERVICE=cli
# Add THISSERVICE to the SERVICE_LIST environment variable, followed by a space
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "

# Set the old CLI as the default client
# If USE_DEPRECATED_CLI is not set or is not equal to false, the old CLI is used
if [ -z "$USE_DEPRECATED_CLI" ] || [ "$USE_DEPRECATED_CLI" != "false" ]; then
  USE_DEPRECATED_CLI="true"
fi

# Define the updateCli function to update the CLI configuration
updateCli() {
  # If USE_DEPRECATED_CLI is equal to"true"If you have a new version of the CLI, you can configure the old version of the CLI
if [ "$USE_DEPRECATED_CLI" == "true" ]; then
    export HADOOP_CLIENT_OPTS=" -Dproc_hivecli $HADOOP_CLIENT_OPTS " # Add configuration options
    CLASS= # Set class as old CLI driver
    JAR=hive-cli-*.jar # Set the jar package to the old CLI's jar package
else
    # If USE_DEPRECATED_CLI is not equal to the"true"To configure a new version of the CLI (Beeline), you need to
    export HADOOP_CLIENT_OPTS=" -Dproc_beeline $HADOOP_CLIENT_OPTS -=" # Add configuration options
    CLASS= # Set class to new version of CLI (Beeline) driver
    JAR=hive-beeline-*.jar # Set the jar package to the new version of the CLI (Beeline) jar package
fi
}

# Define the cli function for executing Hive commands
cli () {
  updateCli # Call the updateCli function to update the configuration
  execHiveCmd $CLASS $JAR"$@" # Execute Hive commands
}

# Define the cli_help function to display help information for Hive commands
cli_help () {
  updateCli # Call the updateCli function to update the configuration
  execHiveCmd $CLASS $JAR"--help" # Execute the help command
}

As you can see from inside the implementation script, if Hive Cli is launched, the hive-cli-*.jar dependency is loaded and then launched from the main method of the corresponding class. If Beeline is started, the main method of the class in the hive-beeline-*.jar dependency is loaded. Below, we take the CliDriver class as an example for source code entry analysis.

methodologies

In the CliDriver class, find the corresponding main method:

 public static void main(String[] args) throws Exception {
    // Hive Cli Startup Portal
    int ret = new CliDriver().run(args);
    // Exiting the Virtual Machine
    (ret);
  }

In the above code, the key is to understand the return parameter ret, which will play a very important role in the whole process. In the follow-up, you can summarize the various return values of ret, so that you can roughly determine the type of error according to the exit code. For example, 0 means normal exit.

methodologies

In the CliDriver class, find the corresponding run method:

public  int run(String[] args) throws Exception {

    OptionsProcessor oproc = new OptionsProcessor();
    // parameterization
    if (!oproc.process_stage1(args)) {
      // Parsing failed, error code returned
      return 1;
}
// Omit other codes
}

Parameter validation via the process_stage1 method is a key step in the process. This method is mainly used to validate system level parameters such as hiveconf,, hivevar, etc. An exception for such parameters will cause the method to return the parameter ret = 1, which may affect the normal execution of subsequent processes. Therefore, ensuring the correctness of these parameters before executing Hive commands is very important to ensure the smooth running of the program.
Next, the logging class will be initialized, although this part is not the main focus. The logging class is initialized in order to record important information during subsequent execution to aid in debugging, bug tracking, and logging. This step provides detailed runtime information for subsequent execution phases, which ensures smooth execution of the program and the feasibility of troubleshooting.

// Initialize logging, where log4j is reinitialized so that it can be initialized before the rest of hive's core classes are loaded
boolean logInitFailed = false;
String logInitDetailMessage;
try {
  logInitDetailMessage = LogUtils.initHiveLog4j();
} catch (LogInitializationException e) {
  logInitFailed = true;
  logInitDetailMessage = ();
}

// Some session configuration is initialized here
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
// Setting up some input streams
 = ;
try {
  // Setting the output stream
   = new PrintStream(, true, "UTF-8");
  // Setting up the information flow
   = new PrintStream(, true, "UTF-8");
  // Setting up the error stream
   = new CachingPrintStream(, true, "UTF-8");
} catch (UnsupportedEncodingException e) {
  // Return error code
  return 3;
}

In this section, first the client-side session class CliSessionState is created, which carries important data, such as the SQL entered by the user and the result of the SQL execution, which is encapsulated in it. Subsequently, on the basis of this class, the initialization of standard input, output and error streams is performed. It is important to note that if the environment does not support UTF-8 character encoding, the method will return ret = 3. This step is critical, as the support of the environment for character encoding has a direct impact on the accuracy of the subsequent character processing and output results.
In process_stage2, the parameter checking is done again, and it is worth noting that the entry parameter is different in this stage. In process_stage1, the input parameters are args. In fact, in process_stage1, the OptionsProcessor saves all the args and assigns them to the CliSessionState object ss based on the key of the parameter in process_stage2 (although the process is is more detailed, it is not very critical in the overall understanding). process_stage2 is responsible for parsing the user's arguments, such as `-e`, `-f`, `-v`, `-database`, and so on. When an exception is thrown for such parameters, the method returns the value ret = 2. This step is the parsing of the user's parameters, where an exception may cause the subsequent execution process to be blocked. The key to this stage is to understand and parse the user input parameters to provide accurate instructions and directions for subsequent processing.

// parameterization
if (!oproc.process_stage2(ss)) {
  // Parameter parsing failure, return error code
  return 2;
}

HiveConf is the configuration class of Hive, which is used to manage various configuration items of Hive. On the command line, the configuration of the current session can be modified through the set command, which is implemented through the HiveConf object. prompt is a terminal command in the interactive page that can be modified through the configuration. At this stage, ensuring that the startup parameter level is okay means that the interactive page is about to be entered, and entering the interactive page means that the user can start using Hive for interactive queries and operations. The importance of this stage is to ensure that the Hive environment is configured accurately to ensure that the user can start the interactive session smoothly.

// Set all properties specified via the command line
HiveConf conf = ();
for (<Object, Object> item : ()) {
  ((String) (), (String) ());
  ().put((String) (), 
  (String) ());
}

// Read the prompt configuration and replace the variables
prompt = ();
prompt = new VariableSubstitution(new HiveVariableSource() {
  @Override
  public Map<String, String> getHiveVariable() {
    return ().getHiveVariables();
  }
}).substitute(conf, prompt);
prompt2 = spacesForString(prompt);

Finally, we enter the core code section, where the system is about to execute the next operation through CliSessionState (session information), HiveConf (configuration information), and OptionsProcessor (parameter information). This phase is the core of the entire execution process, combining the user's session information, Hive's configuration information, and the parameter information provided by the user to provide the basis and guidance for the subsequent operations. This process is responsible for integrating the user's operating environment, configuration, and command parameters to lay the foundation for the upcoming operations.

// Executing the cli driver
try {
  return executeDriver(ss, conf, oproc);
} finally {
  ();
  ();
}

methodologies

This part mainly involves some initialization work. When starting Hive, if we specify a database, the processing is handed off to the processSelectDatabase method. The core of this method is to execute processLine("use " + database + ";"), which means to execute the use command to switch to the specified database. This step plays an important role in the initialization process because it allows the user to directly locate the specified database when starting Hive instead of using the default database.

CliDriver cli = new CliDriver();
(());

// If a database is specified, the specified database is used
(ss);

methodologies

In the processLine method, there is a parameter to control the exit. In this case, the parameter true stands for allow interrupt, which means that the user is allowed to interrupt the operation via Ctrl + C. Thus, processLine initially handles the logic of interrupting. The essence of this type of operation is to register a JVM hook program, which detects a semaphore and executes a specific piece of exit logic when the JVM exits. Such an implementation would allow the user to interrupt the current operation with Ctrl + C during a Hive session and perform the appropriate cleanup or exit logic to ensure that resources are properly freed. We can manually write similar programs to understand the mechanism and implementation of this signal volume handling.

public static void main(String[] args) {
        // Registering Hook Functions
        ().addShutdownHook(new Thread(() -> {
            ("Abnormal termination of the program, implementation of liquidation");
        }, "Hook simulation thread."));

        while (true) {
            (new Date().toString() + ": Logical processing");
            try {
                // Hibernate for 10 seconds
                (10);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

methodologies

In this part of the core code, the SQL is formatted by SessionState, and some formatting is done to the SQL, such as removing spaces, splitting it into tokens by spaces, and so on. Subsequently, some judgment is made on the SQL or the tokens. These operations may include syntax checking, semantic validation, etc. to ensure that the SQL entered by the user conforms to the expected format and specification, and that the input is checked for correctness as necessary. These processes ensure that the system is able to correctly execute the SQL statements entered by the user.

CliSessionState ss = (CliSessionState) ();
(cmd);

();
// Refreshes the output stream so that it does not contain the output of the previous command
();
String cmd_trimmed = (cmd).trim();
// Splitting SQL Statements by Spaces
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;

methodologies

This approach actually involves global processing of the entire process, starting with the parsing of the SQL, then executing the SQL, and finally printing out the results. This sequence of operations involves parsing the SQL statement entered by the user, converting it into executable tasks, executing these tasks, and finally presenting the results of the execution to the user. In this stage, the system handles the syntactic parsing of SQL, logical execution, physical execution, and result output. This method belongs to the key processing steps in the Hive system and completes the main process of the whole SQL query execution.

int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
  boolean escapeCRLF = (conf, 
  .HIVE_CLI_PRINT_ESCAPE_CRLF);
  int ret = 0;

  if (proc != null) {
    if (proc instanceof IDriver) {
      IDriver qp = (IDriver) proc;
      PrintStream out = ;
      long start = ();
      if (()) {
        (cmd);
      }

      ret = (cmd).getResponseCode();
      if (ret != 0) {
        ();
        return ret;
      }

      // Execute the query and calculate the time
      long end = ();
      double timeTaken = (end - start) / 1000.0;

      ArrayList<String> res = new ArrayList<String>();

      printHeader(qp, out);

      // Print out the results
      int counter = 0;
      try {
        if (out instanceof FetchConverter) {
          ((FetchConverter) out).fetchStarted();
        }
        while ((res)) {
          for (String r : res) {
                if (escapeCRLF) {
                  r = (r);
                }
            (r);
          }
          counter += ();
          ();
          if (()) {
            break;
          }
        }
      } catch (IOException e) {
        ("Failed with exception " 
        + ().getName() + ":" + (),
            "\n" + (e));
        ret = 1;
      }

      ();

      if (out instanceof FetchConverter) {
        ((FetchConverter) out).fetchFinished();
      }

      (
          "Time taken: " + timeTaken + " seconds" 
          + (counter == 0 ? "" : ", Fetched: " + counter + " row(s)"));
    } else {
      String firstToken = tokenizeCmd(())[0];
      String cmd_1 = getFirstCmd((), ());

      if (()) {
        (firstToken + " " + cmd_1);
      }
      CommandProcessorResponse res = (cmd_1);
      if (() != 0) {
        
            .println("Query returned non-zero code: " 
            + () + ", cause: " + ());
      }
      if (() != null) {
        for (String consoleMsg : ()) {
          (consoleMsg);
        }
      }
      ret = ();
    }
  }

  return ret;
}

In CliDriver, the invocation relationship of class methods presents a complex structure, which mainly involves a number of important steps such as starting a CLI session, processing command line inputs, and executing SQL statements. There are mutual invocations and dependencies between these methods, forming a complete execution flow. Some of the core methods include parameter processing, session state management, SQL parsing, execution plan generation and result output. The coordination and interaction between these methods form the complete workflow of Hive CLI. Understanding the invocation relationship between these methods helps to deeply grasp the working principle and internal mechanism of Hive CLI. As shown in the figure.

4. Summary

Focusing on Hive source code analysis, this chapter delves into the core process of Hive query processing. The source code analysis provides a step-by-step understanding of how Hive processes SQL queries, discussing in detail the key details and functionality of each processing stage.

5. Concluding remarks

This blog will share with you here, if you have any questions in the process of research and study, you can add a group for discussion or send an email to me, I will do my best to answer for you, with you!

Also, the blogger has a new book out calledDeeper Understanding of Hiveand the simultaneous publication of theKafka is not hard to learnand theHadoop Big Data Mining from Introduction to Advanced PracticeIt can also be used in conjunction with the new book, so if you like it, you can use it with the new book.Click on the buy link on the bulletin board there to purchase the blogger's bookTo carry out the study, I would like to thank you for your support. Follow the public number below and follow the prompts to get free instructional videos for the books.