Detailed explanation of 4 common data sources in Spark SQL

Detailed explanation of 4 common data sources in Spark SQL

Generic load/write methods

Manually specify options

Spark SQL's DataFrame interface supports operations on multiple data sources. A DataFrame can be operated on in the same way as RDDs and can also be registered as a temporary table. After registering the DataFrame as a temporary table, you can execute SQL queries on the DataFrame.

The default data source for Spark SQL is in Parquet format. When the data source is a Parquet file, Spark SQL can easily perform all operations.

Modify the configuration item spark.sql.sources.default to change the default data source format.

scala> val df = spark.read.load("hdfs://hadoop001:9000/namesAndAges.parquet")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.select("name").write.save("names.parquet")

When the data source format is not a parquet file, you need to manually specify the data source format. The data source format needs to specify the full name (for example: org.apache.spark.sql.parquet). If the data source format is a built-in format, you only need to specify the abbreviation json, parquet, jdbc, orc, libsvm, csv, text to specify the data format.

You can use the read.load method provided by SparkSession for general loading of data, and use write and save to save data.

scala> val peopleDF = spark.read.format("json").load("hdfs://hadoop001:9000/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.write.format("parquet").save("hdfs://hadoop001:9000/namesAndAges.parquet")
scala>

In addition, you can run SQL directly on the file:

val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop001:9000/namesAndAges.parquet`")
sqlDF.show()

File saving options

SaveMode can be used to perform storage operations. SaveMode defines the data processing mode. It is important to note that these save modes do not use any locking and are not atomic operations. In addition, when the Overwrite method is used, the original data is deleted before the new data is output. SaveMode is described in detail in the following table:

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) "error" (default) If the file exists, an error is reported.
SaveMode.Append "append" Append
SaveMode.Overwrite "overwrite" Override
SaveMode.Ignore "ignore" If data exists, ignore it

Parquet Files

Parquet read and write

Parquet format is often used in the Hadoop ecosystem, and it also supports all data types of Spark SQL. Spark SQL provides methods to directly read and store Parquet format files.

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("hdfs://hadoop001:9000/people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("hdfs://hadoop001:9000/people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

Parsing partition information

Partitioning a table is one way to optimize data. In a partitioned table, data is stored in different directories using partition columns. Parquet data sources can now automatically discover and resolve partition information. For example, to partition population data into gender and country columns, use the following directory structure:

path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...

By passing path/to/table to SQLContext.read.parque

or SQLContext.read.load, Spark SQL will automatically resolve the partition information.

The schema of the returned DataFrame is as follows:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

It should be noted that the data type of the partition column of the data is automatically parsed. Currently, numeric and string types are supported. The parameters for automatically parsing partition types are:

spark.sql.sources.partitionColumnTypeInference.enabled

The default value is true.

If you want to disable this function, just set this parameter to disabled. At this time, the partition column data format will be set to string type by default, and type parsing will no longer be performed.

Schema Merge

Like ProtocolBuffer, Avro, and Thrift, Parquet also supports Schema evolution. Users can first define a simple Schema, and then gradually add column descriptions to the Schema. In this way, users can obtain multiple Parquet files with different schemas but compatible with each other. The Parquet data source now automatically detects this situation and merges the schemas of these files.

Because schema merging is a costly operation and is not necessary in most cases, Spark SQL has disabled this feature by default since 1.5.0. You can enable this feature in two ways:

When the data source is a Parquet file, set the data source option mergeSchema to true.

To set global SQL options:

spark.sql.parquet.mergeSchema is true.

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("hdfs://hadoop001:9000/data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("hdfs://hadoop001:9000/data/test_table/key=2")
// Read the partitioned table
val df3 = spark.read.option("mergeSchema", "true").parquet("hdfs://hadoop001:9000/data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
//root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Hive Data Source

Apache Hive is the SQL engine for Hadoop, and Spark SQL can be compiled with or without Hive support. Spark SQL with Hive support can support Hive table access, UDF (user-defined function) and Hive query language (HiveQL/HQL), etc. One point that needs to be emphasized is that if you want to include Hive libraries in Spark SQL, you do not need to install Hive in advance. Generally speaking, it is best to compile Spark SQL with Hive support so that you can use these features. If you downloaded a binary version of Spark, it should have been compiled with Hive support added.

To connect Spark SQL to a deployed Hive, you must copy hive-site.xml to the Spark configuration file directory ($SPARK_HOME/conf). Spark SQL can run even if Hive is not deployed.

It should be noted that if you have not deployed Hive, Spark SQL will create its own Hive metadata warehouse called metastore_db in the current working directory. Additionally, if you attempt to create tables using the CREATE TABLE (not CREATE EXTERNAL TABLE) statement in HiveQL, the tables will be placed in the /user/hive/warehouse directory on your default file system (HDFS if you have a configured hdfs-site.xml in your classpath, otherwise it is the local file system).

import java.io.File
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
//|238|val_238|
//| 86| val_86|
//|311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...

Embedded Hive Application

If you want to use the built-in Hive, you don't need to do anything, just use it directly. –conf :

spark.sql.warehouse.dir=

Note: If you are using the internal Hive, after Spark 2.0, spark.sql.warehouse.dir is used to specify the address of the data warehouse. If you need to use HDFS as the path, you need to add core-site.xml and hdfs-site.xml to the Spark conf directory. Otherwise, only the warehouse directory on the master node will be created, and the problem of file not being found will occur during query. In this case, you need to use HDFS, delete the metastore, and restart the cluster.

External Hive Application

If you want to connect to an externally deployed Hive, you need to complete the following steps.

a Copy or soft link the hive-site.xml file in Hive to the conf directory under the Spark installation directory.

b Open the spark shell and bring along the JDBC client for accessing the Hive metabase.

$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar

JSON Dataset

Spark SQL can automatically infer the structure of a JSON dataset and load it as a Dataset[Row]. You can use SparkSession.read.json() to load a Dataset[String] or a JSON file. Note that this JSON file is not a traditional JSON file. Each row must be a JSON string.

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
//root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

JDBC

Spark SQL can create DataFrame by reading data from a relational database through JDBC. After a series of calculations on the DataFrame, the data can be written back to the relational database.

Note that you need to put the relevant database driver in the spark class path.

$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "hive").load()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hive")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop001:3306/rdd", "rddtable", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop001:3306/rdd")
.option("dbtable", "rddtable2")
.option("user", "root")
.option("password", "hive")
.save()
jdbcDF2.write
.jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", connectionProperties)

The above is the full content of this article. I hope it will be helpful for everyone’s study. I also hope that everyone will support 123WORDPRESS.COM.

You may also be interested in:
  • Spark SQL 2.4.8 Two ways to operate Dataframe
  • Detailed process of creating SparkSession and sparkSQL
  • IDEA development and configuration of SparkSQL and simple use case code
  • Analysis of the overall implementation logic of Spark SQL
  • Implementation of pyspark reading and writing Mysql database
  • SparkSQL reads hive data and runs local idea in detail
  • Apache Hudi integrates Spark SQL to operate hide table

<<:  Basic Implementation of AOP Programming in JavaScript

>>:  How to install Nginx in CentOS

Recommend

React Router 5.1.0 uses useHistory to implement page jump navigation

Table of contents 1. Use the withRouter component...

Implementation of static website layout in docker container

Server placement It is recommended to use cloud s...

How to install nginx in centos7

Install the required environment 1. gcc installat...

A detailed introduction to the CSS naming specification BEM from QQtabBar

BEM from QQtabBar First of all, what does BEM mea...

A brief discussion on the CSS overflow mechanism

Why do you need to learn CSS overflow mechanism i...

Using text shadow and element shadow effects in CSS

Introduction to Text Shadows In CSS , use the tex...

MySQL uses inet_aton and inet_ntoa to process IP address data

This article will introduce how to save IP addres...

jQuery achieves the shutter effect (using li positioning)

This article shares the specific code of jQuery t...

The magic of tr command in counting the frequency of English words

We are all familiar with the tr command, which ca...

14 practical experiences on reducing SCSS style code by 50%

Preface Sass is an extension of the CSS3 language...

Analysis of the principle of Vue nextTick

Table of contents Event Loop miscroTask (microtas...

Tutorial on installing MySQL 5.6 on CentOS 6.5

1. Download the RPM package corresponding to Linu...

Example statements for indexes and constraints in MySQL

Foreign Keys Query which tables the primary key o...