Generic load/write methods Manually specify options Spark SQL's DataFrame interface supports operations on multiple data sources. A DataFrame can be operated on in the same way as RDDs and can also be registered as a temporary table. After registering the DataFrame as a temporary table, you can execute SQL queries on the DataFrame. The default data source for Spark SQL is in Parquet format. When the data source is a Parquet file, Spark SQL can easily perform all operations. Modify the configuration item spark.sql.sources.default to change the default data source format. scala> val df = spark.read.load("hdfs://hadoop001:9000/namesAndAges.parquet") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.select("name").write.save("names.parquet") When the data source format is not a parquet file, you need to manually specify the data source format. The data source format needs to specify the full name (for example: org.apache.spark.sql.parquet). If the data source format is a built-in format, you only need to specify the abbreviation json, parquet, jdbc, orc, libsvm, csv, text to specify the data format. You can use the read.load method provided by SparkSession for general loading of data, and use write and save to save data. scala> val peopleDF = spark.read.format("json").load("hdfs://hadoop001:9000/people.json") peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> peopleDF.write.format("parquet").save("hdfs://hadoop001:9000/namesAndAges.parquet") scala> In addition, you can run SQL directly on the file: val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop001:9000/namesAndAges.parquet`") sqlDF.show() File saving options SaveMode can be used to perform storage operations. SaveMode defines the data processing mode. It is important to note that these save modes do not use any locking and are not atomic operations. In addition, when the Overwrite method is used, the original data is deleted before the new data is output. SaveMode is described in detail in the following table:
Parquet Files Parquet read and write Parquet format is often used in the Hadoop ecosystem, and it also supports all data types of Spark SQL. Spark SQL provides methods to directly read and store Parquet format files. // Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("hdfs://hadoop001:9000/people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("hdfs://hadoop001:9000/people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ Parsing partition information Partitioning a table is one way to optimize data. In a partitioned table, data is stored in different directories using partition columns. Parquet data sources can now automatically discover and resolve partition information. For example, to partition population data into gender and country columns, use the following directory structure: path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ... By passing path/to/table to SQLContext.read.parque or SQLContext.read.load, Spark SQL will automatically resolve the partition information. The schema of the returned DataFrame is as follows: root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true) It should be noted that the data type of the partition column of the data is automatically parsed. Currently, numeric and string types are supported. The parameters for automatically parsing partition types are: spark.sql.sources.partitionColumnTypeInference.enabled The default value is true. If you want to disable this function, just set this parameter to disabled. At this time, the partition column data format will be set to string type by default, and type parsing will no longer be performed. Schema Merge Like ProtocolBuffer, Avro, and Thrift, Parquet also supports Schema evolution. Users can first define a simple Schema, and then gradually add column descriptions to the Schema. In this way, users can obtain multiple Parquet files with different schemas but compatible with each other. The Parquet data source now automatically detects this situation and merges the schemas of these files. Because schema merging is a costly operation and is not necessary in most cases, Spark SQL has disabled this feature by default since 1.5.0. You can enable this feature in two ways: When the data source is a Parquet file, set the data source option mergeSchema to true. To set global SQL options: spark.sql.parquet.mergeSchema is true. // sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import spark.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("hdfs://hadoop001:9000/data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("hdfs://hadoop001:9000/data/test_table/key=2") // Read the partitioned table val df3 = spark.read.option("mergeSchema", "true").parquet("hdfs://hadoop001:9000/data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. //root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true) Hive Data Source Apache Hive is the SQL engine for Hadoop, and Spark SQL can be compiled with or without Hive support. Spark SQL with Hive support can support Hive table access, UDF (user-defined function) and Hive query language (HiveQL/HQL), etc. One point that needs to be emphasized is that if you want to include Hive libraries in Spark SQL, you do not need to install Hive in advance. Generally speaking, it is best to compile Spark SQL with Hive support so that you can use these features. If you downloaded a binary version of Spark, it should have been compiled with Hive support added. To connect Spark SQL to a deployed Hive, you must copy hive-site.xml to the Spark configuration file directory ($SPARK_HOME/conf). Spark SQL can run even if Hive is not deployed. It should be noted that if you have not deployed Hive, Spark SQL will create its own Hive metadata warehouse called metastore_db in the current working directory. Additionally, if you attempt to create tables using the CREATE TABLE (not CREATE EXTERNAL TABLE) statement in HiveQL, the tables will be placed in the /user/hive/warehouse directory on your default file system (HDFS if you have a configured hdfs-site.xml in your classpath, otherwise it is the local file system). import java.io.File import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) // warehouseLocation points to the default location for managed databases and tables val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key| value| // +---+-------+ //|238|val_238| //| 86| val_86| //|311|val_311| // ... // Aggregation queries are also supported. sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") // The items in DataFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") // Queries can then join DataFrame data with data stored in Hive. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // | 5| val_5| 5| val_5| // ... Embedded Hive Application If you want to use the built-in Hive, you don't need to do anything, just use it directly. –conf : spark.sql.warehouse.dir= Note: If you are using the internal Hive, after Spark 2.0, spark.sql.warehouse.dir is used to specify the address of the data warehouse. If you need to use HDFS as the path, you need to add core-site.xml and hdfs-site.xml to the Spark conf directory. Otherwise, only the warehouse directory on the master node will be created, and the problem of file not being found will occur during query. In this case, you need to use HDFS, delete the metastore, and restart the cluster. External Hive Application If you want to connect to an externally deployed Hive, you need to complete the following steps. a Copy or soft link the hive-site.xml file in Hive to the conf directory under the Spark installation directory. b Open the spark shell and bring along the JDBC client for accessing the Hive metabase. $ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar JSON Dataset Spark SQL can automatically infer the structure of a JSON dataset and load it as a Dataset[Row]. You can use SparkSession.read.json() to load a Dataset[String] or a JSON file. Note that this JSON file is not a traditional JSON file. Each row must be a JSON string. {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} // Primitive types (Int, String, etc) and Product types (case classes) encoders are // supported by importing this when creating a Dataset. import spark.implicits._ // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() //root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+ JDBC Spark SQL can create DataFrame by reading data from a relational database through JDBC. After a series of calculations on the DataFrame, the data can be written back to the relational database. Note that you need to put the relevant database driver in the spark class path. $ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "hive").load() val connectionProperties = new Properties() connectionProperties.put("user", "root") connectionProperties.put("password", "hive") val jdbcDF2 = spark.read .jdbc("jdbc:mysql://hadoop001:3306/rdd", "rddtable", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:mysql://hadoop001:3306/rdd") .option("dbtable", "rddtable2") .option("user", "root") .option("password", "hive") .save() jdbcDF2.write .jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", connectionProperties) The above is the full content of this article. I hope it will be helpful for everyone’s study. I also hope that everyone will support 123WORDPRESS.COM. You may also be interested in:
|
<<: Basic Implementation of AOP Programming in JavaScript
>>: How to install Nginx in CentOS
Table of contents 1. Use the withRouter component...
Server placement It is recommended to use cloud s...
Install the required environment 1. gcc installat...
BEM from QQtabBar First of all, what does BEM mea...
Why do you need to learn CSS overflow mechanism i...
Introduction to Text Shadows In CSS , use the tex...
This article will introduce how to save IP addres...
This article shares the specific code of jQuery t...
We are all familiar with the tr command, which ca...
Preface Sass is an extension of the CSS3 language...
Table of contents Event Loop miscroTask (microtas...
1. Download the RPM package corresponding to Linu...
Foreign Keys Query which tables the primary key o...
The MySQL built on Tencent Cloud is always very s...
When I turned on my MAC at night, I found that th...