Dataframe is a new API introduced in Spark 1.3.0, which enables Spark to process large-scale structured data. It is easier to use than the original RDD conversion method, and its computing performance is said to be twice as fast. Spark can convert RDD into DataFrame in offline batch processing or real-time computing, and then operate on the data through simple SQL commands. For people who are familiar with SQL, the conversion and filtering process is very convenient, and can even have higher-level applications. For example, in real-time, the topic name and SQL statement of Kafka are passed in, and the background reads the configured content fields and reflects them into a class and uses the input and output SQL to calculate the real-time data. In this case, people who do not know Spark Streaming can also easily enjoy the benefits of real-time computing. The following example is the process of reading a local file into an RDD and implicitly converting it into a DataFrame to query the data, and finally writing it to a MySQL table in the form of appending. The Scala code example is as follows import java.sql.Timestamp import org.apache.spark.sql.{SaveMode, SQLContext} import org.apache.spark.{SparkContext, SparkConf} object DataFrameSql { case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)extends Serializable{ override def toString: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp) } def main(args:Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]") // ---------------------- //Parameter spark.sql.autoBroadcastJoinThreshold sets whether a table should be broadcast, the default is 10M, set to -1 to disable //spark.sql.codegen whether to precompile sql into java bytecode, long or frequent sql has optimization effect // spark.sql.inMemoryColumnarStorage.batchSize the number of rows processed at a time, be careful of oom //spark.sql.inMemoryColumnarStorage.compressed sets whether the column storage in memory needs to be compressed// ---------------------- conf.set("spark.sql.shuffle.partitions","20") //The default partition is 200 conf.setAppName("dataframe test") val sc = new SparkContext(conf) val sqc = new SQLContext(sc) val ac = sc.accumulator(0,"fail nums") val file = sc.textFile("src\\main\\resources\\000000_0") val log = file.map(lines => lines.split(" ")).filter(line => if (line.length != 4) { //Do a simple filter ac.add(1) false } else true) .map(line => memberbase(line(0).toLong, line(1),Timestamp.valueOf(line(2)), line(3).toInt)) // Method 1: Use implicit conversion import sqc.implicits._ val dftemp = log.toDF() // conversion/* Method 2: Use the createDataFrame method to obtain fields and their types using reflection internally val dftemp = sqc.createDataFrame(log) */ val df = dftemp.registerTempTable("memberbaseinfo") /*val sqlcommand = "select date_format(createtime,'yyyy-MM')as mm,count(1) as nums " + "from memberbaseinfo group by date_format(createtime,'yyyy-MM')" + "order by nums desc,mm asc "*/ val sqlcommand="select * from memberbaseinfo" val sel = sqc.sql(sqlcommand) val prop = new java.util.Properties prop.setProperty("user","etl") prop.setProperty("password","xxx") //Call DataFrameWriter to write data to mysql val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // The table may not exist println(ac.name.get+" "+ac.value) sc.stop() } } The sample data in the textFile in the above code is as follows. The data comes from hive. The field information is partition number, user id, registration time, and third-party number. 20160309 45386477 2012-06-12 20:13:15 901438 20160309 45390977 2012-06-12 22:38:06 901036 20160309 45446677 2012-06-14 21:57:39 901438 20160309 45464977 2012-06-15 13:42:55 901438 20160309 45572377 2012-06-18 14:55:03 902606 20160309 45620577 2012-06-20 00:21:09 902606 20160309 45628377 2012-06-20 10:48:05 901181 20160309 45628877 2012-06-20 11:10:15 902606 20160309 45667777 2012-06-21 18:58:34 902524 20160309 45680177 2012-06-22 01:49:55 20160309 45687077 2012-06-22 11:23:22 902607 Note the field type mapping here, that is, the case class to dataframe mapping, as shown in the screenshot from the official website: For more details, see the official document Spark SQL and DataFrame Guide The above example of converting spark RDD to dataframe and writing it into MySQL is all I want to share with you. I hope it can give you a reference. I also hope that you will support 123WORDPRESS.COM. You may also be interested in:
|
<<: How to run Spring Boot application in Docker
I logged into the backend to check the solution t...
IE8 new feature Web Slices (Web Slices) Microsoft...
1. Install the express library and generator Open...
When a company developer executes an insert state...
webpack loads css files and its configuration Aft...
Operating system: Ubuntu 17.04 64-bit MySQL versi...
In fact, XHTML 1.0 is divided into two types (thr...
I always feel that translate and transition are v...
Use profile to analyze slow SQL The main purpose ...
Overview In a database, an index is used to speed...
For some systems with large amounts of data, the ...
Preface Today, when I was using a self-written co...
a : Indicates the starting or destination positio...
The application of containers is becoming more an...
Table of contents Merge namespaces with the same ...