Parsing Apache Avro Data in One Article

Parsing Apache Avro Data in One Article

Abstract: This article will demonstrate how to serialize and generate avro data and parse it using FlinkSQL.

Avro official documentation, http://avro.apache.org/docs/current/index.html.

Introduction to Avro

Avro is a data serialization system

Provides:

  • Rich data structure
  • Compact, fast, binary data format
  • A file format for storing persistent data
  • Remote Procedure Call (RPC) system
  • Simple interaction with dynamic languages. There is no need to generate code for reading and writing data files, nor is there any need to use or implement the RPC protocol. Code generation is a form of optimization, but it only makes sense for static languages.

Technical Background

With the rapid development of the Internet, cutting-edge technologies such as cloud computing, big data, artificial intelligence AI, and the Internet of Things have become the mainstream high-tech of today's era. E-commerce websites, face recognition, driverless cars, smart homes, smart cities, etc., not only facilitate people's food, clothing, housing and transportation, but also a large amount of data is collected, clarified, and analyzed by various system platforms at all times. It is particularly important to ensure low latency, high throughput, and security of data. Apache Avro itself is serialized through Schema for binary transmission, which ensures high-speed data transmission on the one hand and data security on the other. Avro is currently being used more and more widely in various industries. How to process and parse Avro data is particularly important. This article will demonstrate how to generate Avro data by serialization and parse it using FlinkSQL.

This article is a demo of avro parsing. Currently, FlinkSQL is only suitable for simple avro data parsing, and complex nested avro data is not supported yet.

Scene Introduction

This article mainly introduces the following three key points:

  • How to serialize and generate Avro data
  • How to deserialize and parse Avro data
  • How to parse Avro data using FlinkSQL

Prerequisites

  • To learn more about Avro, refer to the Apache Avro official website quick start guide
  • Understand avro application scenarios

Procedure

1. Create a new avro maven project and configure pom dependencies

The contents of the pom file are as follows:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.huawei.bigdata</groupId>
    <artifactId>avrodemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

Note: The above pom file configures the path for automatically generated classes, namely ${project.basedir}/src/main/avro/ and ${project.basedir}/src/main/java/. After this configuration, when the mvn command is executed, the plug-in will automatically generate class files from the avsc schema in this directory and put them in the latter directory. If the avro directory is not generated, create it manually.

2. Define the schema

Define the schema for Avro using JSON. The schema consists of primitive types (null, boolean, int, long, float, double, bytes, and string) and complex types (record, enum, array, map, union, and fixed). For example, the following defines a user schema, creates an avro directory under the main directory, and then creates a new file user.avsc under the avro directory:

{"namespace": "lancoo.ecbdc.pre",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number", "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
} 

3. Compile the schema

Click compile of the Maven projects project to compile, which will automatically create the namespace path and User class code

4. Serialization

Create a TestUser class for serializing generated data

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite col or null

// Alternate constructor
User user2 = new User("Ben", 7, "red");

// Construct via builder
User user3 = User.newBuilder()
        .setName("Charlie")
        .setFavoriteColor("blue")
        .setFavoriteNumber(null)
        .build();

// Serialize user1, user2 and user3 to disk
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

After executing the serialization program, avro data will be generated in the same directory of the project

The contents of user_generic.avro are as follows:

Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}

5. Deserialization

Parsing avro data through deserialization code

// Deserialize Users from disk
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("user_generic.avro"), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
    // Reuse user object by passing it to next(). This saves us from
    // allocating and garbage collecting many objects for files with
    // many items.
    user = dataFileReader.next(user);
    System.out.println(user);
}

Execute the deserialization code to parse user_generic.avro

Avro data parsing succeeded.

6. Upload user_generic.avro to the hdfs path

hdfs dfs -mkdir -p /tmp/lztest/
hdfs dfs -put user_generic.avro /tmp/lztest/ 

7. Configure flinkserver

Prepare avro jar package

Put flink-sql-avro-*.jar and flink-sql-avro-confluent-registry-*.jar into flinkserver lib and execute the following command on all flinkserver nodes

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib
chmod 500 flink-sql-avro*.jar
chown omm:wheel flink-sql-avro*.jar 

At the same time, restart the FlinkServer instance and check whether the avro package is uploaded after the restart.

hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

8. Write FlinkSQL

CREATE TABLE testHdfs(
  name String,
  favorite_number int,
  favorite_color String
) WITH(
  'connector' = 'filesystem',
  'path' = 'hdfs:///tmp/lztest/user_generic.avro',
  'format' = 'avro'
);CREATE TABLE KafkaTable (
  name String,
  favorite_number int,
  favorite_color String
) WITH (
  'connector' = 'kafka',
  'topic' = 'testavro',
  'properties.bootstrap.servers' = '96.10.2.1:21005',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'avro'
);
insert into
  KafkaTable
select
  *
from
  testHdfs; 

Save Submit Task

9. Check whether there is data in the corresponding topic

FlinkSQL successfully parsed the avro data.

This is the end of this article about parsing Apache Avro data in one article. For more relevant Apache Avro data content, please search 123WORDPRESS.COM's previous articles or continue to browse the following related articles. I hope everyone will support 123WORDPRESS.COM in the future!

You may also be interested in:
  • Two ways to visualize ClickHouse data using Apache Superset

<<:  Detailed explanation of some settings for Table adaptation and overflow

>>:  Share 10 of the latest web front-end frameworks (translation)

Recommend

Summary of various methods for JavaScript to determine whether it is an array

Table of contents Preface Array.isArray construct...

MySQL fuzzy query usage (regular, wildcard, built-in function)

Table of contents 1. MySQL wildcard fuzzy query (...

CSS to achieve floating customer service effect

<div class="sideBar"> <div>...

A brief discussion on ifnull() function similar to nvl() function in MySQL

IFNULL(expr1,expr2) If expr1 is not NULL, IFNULL(...

JavaScript to achieve digital clock effect

This article example shares the specific code of ...

MySQL 8.0.15 installation and configuration method graphic tutorial

This article records the installation and configu...

MySQL deadlock routine: inconsistent batch insertion order under unique index

Preface The essence of deadlock is resource compe...

Complete steps to quickly configure HugePages under Linux system

Preface Regarding HugePages and Oracle database o...

Vue form input binding v-model

Table of contents 1.v-model 2. Binding properties...

Implementation of whack-a-mole game in JavaScript

This article shares the specific code for JavaScr...

Vue implements video upload function

This article example shares the specific code of ...

Detailed tutorial on installing and using Kong API Gateway with Docker

1 Introduction Kong is not a simple product. The ...

Docker installs ClickHouse and initializes data testing

Clickhouse Introduction ClickHouse is a column-or...

Ubuntu 20.04 sets a static IP address (including different versions)

Because Ubuntu 20.04 manages the network through ...

Common shell script commands and related knowledge under Linux

Table of contents 1. Some points to remember 1. V...