Example of converting spark rdd to dataframe and writing it into mysql

Example of converting spark rdd to dataframe and writing it into mysql

Dataframe is a new API introduced in Spark 1.3.0, which enables Spark to process large-scale structured data. It is easier to use than the original RDD conversion method, and its computing performance is said to be twice as fast. Spark can convert RDD into DataFrame in offline batch processing or real-time computing, and then operate on the data through simple SQL commands. For people who are familiar with SQL, the conversion and filtering process is very convenient, and can even have higher-level applications. For example, in real-time, the topic name and SQL statement of Kafka are passed in, and the background reads the configured content fields and reflects them into a class and uses the input and output SQL to calculate the real-time data. In this case, people who do not know Spark Streaming can also easily enjoy the benefits of real-time computing.

The following example is the process of reading a local file into an RDD and implicitly converting it into a DataFrame to query the data, and finally writing it to a MySQL table in the form of appending. The Scala code example is as follows

import java.sql.Timestamp
import org.apache.spark.sql.{SaveMode, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
object DataFrameSql {
 case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)extends Serializable{
 override def toString: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp)
 }
 def main(args:Array[String]): Unit = {
 val conf = new SparkConf()
 conf.setMaster("local[2]")
// ----------------------
 //Parameter spark.sql.autoBroadcastJoinThreshold sets whether a table should be broadcast, the default is 10M, set to -1 to disable //spark.sql.codegen whether to precompile sql into java bytecode, long or frequent sql has optimization effect // spark.sql.inMemoryColumnarStorage.batchSize the number of rows processed at a time, be careful of oom
 //spark.sql.inMemoryColumnarStorage.compressed sets whether the column storage in memory needs to be compressed// ----------------------
 conf.set("spark.sql.shuffle.partitions","20") //The default partition is 200 conf.setAppName("dataframe test")
 val sc = new SparkContext(conf)
 val sqc = new SQLContext(sc)
 val ac = sc.accumulator(0,"fail nums")
 val file = sc.textFile("src\\main\\resources\\000000_0")
 val log = file.map(lines => lines.split(" ")).filter(line =>
  if (line.length != 4) { //Do a simple filter ac.add(1)
  false
  } else true)
  .map(line => memberbase(line(0).toLong, line(1),Timestamp.valueOf(line(2)), line(3).toInt))
 // Method 1: Use implicit conversion import sqc.implicits._
 val dftemp = log.toDF() // conversion/*
  Method 2: Use the createDataFrame method to obtain fields and their types using reflection internally val dftemp = sqc.createDataFrame(log)
  */
 val df = dftemp.registerTempTable("memberbaseinfo")
 /*val sqlcommand = "select date_format(createtime,'yyyy-MM')as mm,count(1) as nums " +
  "from memberbaseinfo group by date_format(createtime,'yyyy-MM')" +
  "order by nums desc,mm asc "*/
 val sqlcommand="select * from memberbaseinfo"
 val sel = sqc.sql(sqlcommand)
 val prop = new java.util.Properties
 prop.setProperty("user","etl")
 prop.setProperty("password","xxx")
 //Call DataFrameWriter to write data to mysql
 val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // The table may not exist println(ac.name.get+" "+ac.value)
 sc.stop()
 }
}

The sample data in the textFile in the above code is as follows. The data comes from hive. The field information is partition number, user id, registration time, and third-party number.

20160309 45386477 2012-06-12 20:13:15 901438
20160309 45390977 2012-06-12 22:38:06 901036
20160309 45446677 2012-06-14 21:57:39 901438
20160309 45464977 2012-06-15 13:42:55 901438
20160309 45572377 2012-06-18 14:55:03 902606
20160309 45620577 2012-06-20 00:21:09 902606
20160309 45628377 2012-06-20 10:48:05 901181
20160309 45628877 2012-06-20 11:10:15 902606
20160309 45667777 2012-06-21 18:58:34 902524
20160309 45680177 2012-06-22 01:49:55 
20160309 45687077 2012-06-22 11:23:22 902607

Note the field type mapping here, that is, the case class to dataframe mapping, as shown in the screenshot from the official website:

For more details, see the official document Spark SQL and DataFrame Guide

The above example of converting spark RDD to dataframe and writing it into MySQL is all I want to share with you. I hope it can give you a reference. I also hope that you will support 123WORDPRESS.COM.

You may also be interested in:
  • SparkSQL uses IDEA to quickly get started with DataFrame and DataSet
  • DataFrame: How to convert Scala class to DataFrame through SparkSql
  • Conversion examples between pyspark.sql.DataFrame and pandas.DataFrame
  • A brief discussion on the misunderstanding of DataFrame and SparkSql value
  • Spark SQL 2.4.8 Two ways to operate Dataframe

<<:  How to run Spring Boot application in Docker

>>:  How to use vue filter

Recommend

What are Web Slices?

IE8 new feature Web Slices (Web Slices) Microsoft...

The whole process of node.js using express to automatically build the project

1. Install the express library and generator Open...

Solution to the problem of MySQL deleting and inserting data very slowly

When a company developer executes an insert state...

Webpack loads css files and its configuration method

webpack loads css files and its configuration Aft...

XHTML Tutorial: The Difference Between Transitional and Strict

In fact, XHTML 1.0 is divided into two types (thr...

How to use translate and transition in CSS3

I always feel that translate and transition are v...

Detailed explanation of MySQL batch SQL insert performance optimization

For some systems with large amounts of data, the ...

Vue custom components use event modifiers to step on the pit record

Preface Today, when I was using a self-written co...

HTML elements (tags) and their usage

a : Indicates the starting or destination positio...

Zabbix monitoring docker application configuration

The application of containers is becoming more an...

TypeScript namespace merging explained

Table of contents Merge namespaces with the same ...