Preface A classmate is investigating MLSQL Stack's support for streaming. Then I said that flow debugging is actually quite difficult. Through practice, we hope to achieve the following three points:
After implementing these three points, I found that debugging did become much simpler. process First, I created a kaf_write.mlsql file to facilitate writing data to Kafka: set abc=''' { "x": 100, "y": 200, "z": 200 ,"dataType":"A group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} '''; load jsonStr.`abc` as table1; select to_json(struct(*)) as value from table1 as table2; save append table2 as kafka.`wow` where kafka.bootstrap.servers="127.0.0.1:9092"; This way, every time I run it, the data can be written to Kafka. Then, after I finish writing, I need to see if the data is really written in and what it looks like:
This sentence means that I want to sample 10 Kafka data from Kafka. The address of the Kafka is 127.0.0.1:9092 and the topic is wow. The running results are as follows: There's no problem. Then I wrote a very simple streaming program: -- the stream name, should be uniq. set streamName="streamExample"; -- use kafkaTool to infer schema from kafka !kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow; load kafka.`wow` options kafka.bootstrap.servers="127.0.0.1:9092" as newkafkatable1; select * from newkafkatable1 as table21; -- print in webConsole instead of terminal console. save append table21 as webConsole.`` options mode="Append" and duration="15" and checkpointLocation="/tmp/s-cpl4"; The results are as follows: We can also see the real-time effect in the terminal. Replenish Of course, MLSQL Stack has two other great features for streaming. The first is that you can set HTTP protocol callbacks for streaming events, and use batch SQL to process the streaming results and finally store them in the database. See the following script: -- the stream name, should be uniq. set streamName="streamExample"; -- mock some data. set data=''' {"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} '''; -- load data as table load jsonStr.`data` as datasource; --convert table as stream source load mockStream.`datasource` options stepSizeRange="0-3" as newkafkatable1; -- aggregation select cast(value as string) as k from newkafkatable1 as table21; !callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated"; -- output the the result to console. save append table21 as custom.`` options mode="append" and duration="15" and sourceTable="jack" and code=''' select count(*) as c from jack as newjack; save append newjack as parquet.`/tmp/jack`; ''' and checkpointLocation="/tmp/cpl15"; Summarize The above is the full content of this article. I hope that the content of this article will have certain reference learning value for your study or work. Thank you for your support of 123WORDPRESS.COM. You may also be interested in:
|
<<: Getting the creation time of a file under Linux and a practical tutorial
>>: Use of JavaScript sleep function
When setting the text in the search text box, the...
Table of contents Presentation Layer Business Lay...
The main functions are as follows: Add product in...
I think everyone often worries about finding pict...
In a project, you often need to use environment v...
Enter the /etc/yum.repos.d/ folder Create rabbitm...
When it comes to styling our web pages, we have t...
<br />Original text: http://jorux.com/archiv...
A composite index (also called a joint index) is ...
I want to make a page using CSS3 rounded corners ...
1. Methods for implementing components:組件名稱首字母必須大...
This article shares with you how to use JavaScrip...
Specific method: 1. Press [ win+r ] to open the r...
After setting the iframe's src to 'about:b...
1. Refer to the official website to install docke...