Apache Spark 2.0 jobs take a long time to finish when they are finished

Apache Spark 2.0 jobs take a long time to finish when they are finished

Phenomenon

When using Apache Spark 2.x, you may encounter this phenomenon: although our Spark Jobs have all been completed, our program is still executing. For example, we use Spark SQL to execute some SQL, which generates a large number of files in the end. Then we can see that all Spark Jobs of this SQL have actually been run to completion, but this query statement is still running. From the log, we can see that the driver node is moving the files generated by tasks to the directory of the final table one by one. This phenomenon is easy to occur when our job generates a lot of files. This article will introduce a method to solve this problem.

Why does this phenomenon occur?

Spark 2.x uses Hadoop 2.x. When it saves the generated file to HDFS, it finally calls saveAsHadoopFile, which uses FileOutputCommitter, as follows:

The problem lies in the FileOutputCommitter implementation of Hadoop 2.x. There are two noteworthy methods in FileOutputCommitter: commitTask and commitJob. In the FileOutputCommitter implementation of Hadoop 2.x, the mapreduce.fileoutputcommitter.algorithm.version parameter controls how commitTask and commitJob work. The specific code is as follows (for the sake of convenience, I removed irrelevant statements. The complete code can be found in FileOutputCommitter.java):

As you can see, in the commitTask method, there is a conditional judgment algorithmVersion == 1, which is the value of the mapreduce.fileoutputcommitter.algorithm.version parameter, which defaults to 1. If this parameter is 1, then when the Task is completed, the data temporarily generated by the Task will be moved to the corresponding directory of the task, and then moved to the final job output directory when commitJob is called. The default value of this parameter in Hadoop 2.x is 1! This is why we see that the job is completed, but the program is still moving data, resulting in the entire job not being completed. In the end, the commitJob function is executed by Spark's Driver, so there is a reason for the slow execution.

And we can see that if we set the value of the mapreduce.fileoutputcommitter.algorithm.version parameter to 2, then when commitTask is executed, the mergePaths method will be called to directly move the data generated by the Task from the Task temporary directory to the directory finally generated by the program. When executing commitJob, there is no need to move data directly, so it will naturally be much faster than the default value.

Note that in versions prior to Hadoop 2.7.0, we can achieve this by setting the mapreduce.fileoutputcommitter.algorithm.version parameter to a value other than 1, because the program does not limit this value to 2. However, as of Hadoop 2.7.0, the value of the mapreduce.fileoutputcommitter.algorithm.version parameter must be 1 or 2. For details, see MAPREDUCE-4815.

How to set this parameter in Spark

The problem has been found, and we can solve it in the program. There are several ways:

  • Set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 directly in conf/spark-defaults.conf . This has a global impact.
  • Set it directly in the Spark program, spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2"), this is at the job level.
  • If you are using the Dataset API to write data to HDFS, you can set dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2") .

However, if your Hadoop version is 3.x, the default value of the mapreduce.fileoutputcommitter.algorithm.version parameter is already set to 2. For details, see MAPREDUCE-6336 and MAPREDUCE-6406.

Because this parameter has some impact on performance, in Spark 2.2.0, this parameter has been recorded in the Spark configuration document configuration.html , see SPARK-20107 for details.

Summarize

The above is what I introduced to you about Apache Spark 2.0. I hope it will be helpful to you!

You may also be interested in:
  • How to use Spark and Scala to analyze Apache access logs
  • What are the new features of Apache Spark 2.4, which will be released in 2018?

<<:  Perfect solution to the problem of data being truncated when using the group concat function in Mysql5.7

>>:  JS quickly master ES6 class usage

Recommend

A Deep Dive into JavaScript Promises

Table of contents 1. What is Promise? 2. Why is t...

CSS margin overlap and how to prevent it

The vertically adjacent edges of two or more bloc...

Implementation of installing Docker in win10 environment

1. Enter the Docker official website First, go to...

Example of deploying Laravel application with Docker

The PHP base image used in this article is: php:7...

How to deploy Vue project under nginx

Today I will use the server nginx, and I also nee...

A brief summary of my experience in writing HTML pages

It has been three or four months since I joined Wo...

Even a novice can understand the difference between typeof and instanceof in js

Table of contents 1. typeof 2. instanceof 3. Diff...

MySQL case when usage example analysis

First we create the database table: CREATE TABLE ...

Implementation of waterfall layout in uni-app project

GitHub address, you can star it if you like it Pl...

An example of implementing a simple finger click animation with CSS3 Animation

This article mainly introduces an example of impl...

Detailed explanation of ActiveMQ deployment method in Linux environment

This article describes the deployment method of A...

Example of implementing skeleton screen with Vue

Table of contents Skeleton screen use Vue archite...

Detailed installation process of nodejs management tool nvm

nvm nvm is responsible for managing multiple vers...

13 JavaScript one-liners that will make you look like an expert

Table of contents 1. Get a random Boolean value (...