How MLSQL Stack makes stream debugging easier

How MLSQL Stack makes stream debugging easier

Preface

A classmate is investigating MLSQL Stack's support for streaming. Then I said that flow debugging is actually quite difficult. Through practice, we hope to achieve the following three points:

  • Ability to view the latest fixed number of Kafka data at any time
  • Debug results (sink) can be printed on the web console
  • Streaming programs can automatically infer JSON schema (Spark cannot do this now)

After implementing these three points, I found that debugging did become much simpler.

process

First, I created a kaf_write.mlsql file to facilitate writing data to Kafka:

set abc='''
{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
''';
load jsonStr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where 
kafka.bootstrap.servers="127.0.0.1:9092";

This way, every time I run it, the data can be written to Kafka.

Then, after I finish writing, I need to see if the data is really written in and what it looks like:

!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;

This sentence means that I want to sample 10 Kafka data from Kafka. The address of the Kafka is 127.0.0.1:9092 and the topic is wow. The running results are as follows:

There's no problem. Then I wrote a very simple streaming program:

-- the stream name, should be uniq.
set streamName="streamExample";

-- use kafkaTool to infer schema from kafka
!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;


load kafka.`wow` options 
kafka.bootstrap.servers="127.0.0.1:9092"
as newkafkatable1;


select * from newkafkatable1
as table21;


-- print in webConsole instead of terminal console.
save append table21 
as webConsole.`` 
options mode="Append"
and duration="15"
and checkpointLocation="/tmp/s-cpl4";

The results are as follows:

We can also see the real-time effect in the terminal.

Replenish

Of course, MLSQL Stack has two other great features for streaming. The first is that you can set HTTP protocol callbacks for streaming events, and use batch SQL to process the streaming results and finally store them in the database. See the following script:

-- the stream name, should be uniq.
set streamName="streamExample";


-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';

-- load data as table
load jsonStr.`data` as datasource;

--convert table as stream source
load mockStream.`datasource` options 
stepSizeRange="0-3"
as newkafkatable1;

-- aggregation 
select cast(value as string) as k from newkafkatable1
as table21;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.


save append table21 
as custom.`` 
options mode="append"
and duration="15"
and sourceTable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`; 
'''
and checkpointLocation="/tmp/cpl15";

Summarize

The above is the full content of this article. I hope that the content of this article will have certain reference learning value for your study or work. Thank you for your support of 123WORDPRESS.COM.

You may also be interested in:
  • Mysql LONGBLOB type stores binary data (modification + debugging + sorting)
  • Mysql LONGTEXT type stores large files (binary is also possible) (modification + debugging + sorting)
  • Mysql insert Chinese and Chinese query (modification + debugging)
  • Novice configuration PHP debugging environment (IIS+PHP+MYSQL)
  • Related methods of MySQL UDF debugging mode debugview
  • Share 101 MySQL debugging and optimization tips
  • GDB debugging MySQL actual combat source code compilation and installation

<<:  Getting the creation time of a file under Linux and a practical tutorial

>>:  Use of JavaScript sleep function

Recommend

Summary of Spring Boot Docker packaging tools

Table of contents Spring Boot Docker spring-boot-...

How to simplify Redux with Redux Toolkit

Table of contents Problems Redux Toolkit solves W...

Tutorial diagram of installing zabbix2.4 under centos6.5

The fixed IP address of the centos-DVD1 version s...

React Native scaffolding basic usage detailed explanation

Build the project Execute the command line in the...

Detailed explanation of the use of default in MySQL

NULL and NOT NULL modifiers, DEFAULT modifier, AU...

What are mysql dirty pages?

Table of contents Dirty pages (memory pages) Why ...

How to manage docker through UI

Docker is being used in more and more scenarios. ...

Summary of knowledge points on using calculated properties in Vue

Computed properties Sometimes we put too much log...

W3C Tutorial (12): W3C Soap Activity

Web Services are concerned with application-to-ap...

Solution to large line spacing (5 pixels more in IE)

Copy code The code is as follows: li {width:300px...

Detailed steps to install CentOS7 system on VMWare virtual machine

Pre-installation work: Make sure vmware workstati...

Docker solution for logging in without root privileges

When you use the docker command for the first tim...

Example code for implementing page floating box based on JS

When the scroll bar is pulled down, the floating ...

WeChat applet scroll-view realizes left and right linkage

This article shares the specific code for WeChat ...

How to set a fixed IP address for a VMware virtual machine (graphic tutorial)

1. Select Edit → Virtual Network Editor in the me...