Introduction and architecture of Apache Arrow, a high-performance data format library package on JVM (Gkatziouras)

Introduction and architecture of Apache Arrow, a high-performance data format library package on JVM (Gkatziouras)

Apache Arrow is a popular format used by various big data tools, including BigQuery, and is a storage format for both flat and hierarchical data. It is a memory intensive way to speed up applications.

A commonly used library in the field of data processing and data science: Apache Arrow. Arrow is used by open source projects such as Apache Parquet, Apache Spark, pandas, and many commercial or closed source services. It provides the following features:

  • In-memory computing
  • Standardized columnar storage format
  • An IPC and RPC framework for data exchange between processes and nodes respectively

Let's take a look at how things worked before Arrow came along:

We can see that in order for Spark to read data from a Parquet file, we need to read and deserialize the data in Parquet format. This requires us to make a complete copy of the data by loading it into memory. First, we read the data into an in-memory buffer and then use Parquet’s conversion methods to convert the data (e.g., a string or a number) into a representation of our programming language. This is necessary because Parquet represents numbers differently than the Python programming language.

This is a big problem for performance for many reasons:

  • We are copying the data and running the transformation steps on it. The data is in different formats and we need to read and convert all the data before doing any calculations on it.
  • The data we are loading must fit into memory. You only have 8GB of RAM and the data is 10GB? You are so unlucky!

Now, let’s look at how Apache Arrow improves this:

Instead of copying and transforming data, Arrow understands how to read and manipulate data directly. To this end, the Arrow community defined a new file format and operations that work directly on the serialized data. This data format can be read directly from disk without having to load it into memory and convert/deserialize the data. Of course, part of the data will still be loaded into RAM, but your data doesn't have to fit into memory. Arrow uses its file memory mapping capabilities to load as much data into memory only as necessary and possible.

Apache Arrow supports the following languages:

  • C++
  • C#
  • Go
  • Java
  • JavaScript
  • Rust
  • Python (through the C++ library)
  • Ruby (through the C++ library)
  • R (through the C++ library)
  • MATLAB (through the C++ library).

Arrow Features

Arrow is first and foremost a library that provides columnar data structures for in-memory computing. Any data can be decompressed and decoded into Arrow columnar data structures so that in-memory analysis can then be performed on the decoded data. The Arrow column format has some nice properties: random access is O(1), and each value cell is adjacent to the previous and next in memory, so iteration is very efficient.

Apache Arrow defines a binary "serialization" protocol for arranging collections of Arrow column arrays (called "record batches") that can be used for messaging and inter-process communication. You can put the protocol anywhere, including on disk, and later it can be memory-mapped or read into memory and sent somewhere else.

The Arrow protocol is designed so that you can "map" a block of Arrow data without any deserialization, so performing analytics on Arrow protocol data on disk can use memory mapping and effectively pay zero cost. This protocol is used for many things, such as streaming data between Spark SQL and Python, for running pandas functions against chunks of Spark SQL data, these are called "pandas udfs".

Arrow is designed for memory (but you can put it on disk and then memory map it). They are designed to be compatible with each other and used together in applications, whereas their competitor, Apache Parquet files, are designed for disk storage.

Pros: Apache Arrow defines a language-independent columnar storage format for flat and hierarchical data, organized for efficient analytical operations on modern hardware such as CPUs and GPUs. The Arrow memory format also supports zero-copy reads for lightning-fast data access without serialization overhead.

Apache Arrow for Java

Import the library:

<dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>arrow-memory-netty</artifactId>
    <version>${arrow.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>arrow-vector</artifactId>
    <version>${arrow.version}</version>
</dependency>

Before we begin, it is important to understand that for Arrow read/write operations, byte buffers are used. Operations such as reading and writing are continuous exchanges of bytes. To improve efficiency, Arrow comes with a buffer allocator that can be of a fixed size or with automatic expansion capabilities. Libraries that support allocation management are arrow-memory-netty and arrow-memory-unsafe. We use netty here.

Storing data with Arrow requires a schema, which can be defined programmatically:

package com.gkatzioura.arrow;

import java.io.IOException;

import java.util.List;

import org.apache.arrow.vector.types.pojo.ArrowType;

import org.apache.arrow.vector.types.pojo.Field;

import org.apache.arrow.vector.types.pojo.FieldType;

import org.apache.arrow.vector.types.pojo.Schema;

public class SchemaFactory {

public static Schema DEFAULT_SCHEMA = createDefault();

public static Schema createDefault() {

var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);

var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);

return new Schema(List.of(strField, intField));

}

public static Schema schemaWithChildren() {

var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);

var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);

var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

return new Schema(List.of(itemField));

}

public static SchemafromJson(String jsonString) {

try {

return Schema.fromJSON(jsonString);

} catch (IOException e) {

throw new ArrowExampleException(e);

}

}

}

They also have a parsable json representation:

{
  "fields" : [ {
    "name" : "col1",
    "nullable" : true,
    "type" : {
      "name" : "utf8"
    },
    "children" : [ ]
  }, {
    "name" : "col2",
    "nullable" : true,
    "type" : {
      "name" : "int",
      "bitWidth" : 32,
      "isSigned" : true
    },
    "children" : [ ]
  } ]
}

Additionally, just like Avro, you can design complex schemas and embedded values ​​on fields:

public static Schema schemaWithChildren() {
    var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
    var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
    var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));
 
    return new Schema(List.of(itemField));
}

Based on the Schema above, we will create a DTO for our class:

package com.gkatzioura.arrow;
 
import lombok.Builder;
import lombok.Data;
 
@Data
@Builder
public class DefaultArrowEntry {
 
    private String col1;
    private Integer col2;
 
}

Our goal is to convert these Java objects into Arrow byte streams.

1. Create a DirectByteBuffer using an allocator

These buffers are off-heap. You do need to free the memory used, but for the library user this is done by performing a close() operation on the allocator. In our case, our class will implement the Closeable interface, which will perform the allocator closing operation.

By using the streaming api, the data will be streamed to an OutPutStream submitted using the Arrow format:

package com.gkatzioura.arrow;
 
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.List;
 
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.util.Text;
 
import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA;
 
public class DefaultEntriesWriter implements Closeable {
 
    private final RootAllocator rootAllocator;
    private final VectorSchemaRoot vectorSchemaRoot; //Vector allocator creation:
 
    public DefaultEntriesWriter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }
 
    public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) {
        if (batchSize <= 0) {
            batchSize = defaultArrowEntries.size();
        }
 
        DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();
        try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
            writer.start();
 
            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();
 
            boolean exactBatches = defaultArrowEntries.size()%batchSize == 0;
            int batchCounter = 0;
 
            for(int i=0; i < defaultArrowEntries.size(); i++) {
                childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2());
 
                batchCounter++;
 
                if(batchCounter == batchSize) {
                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();
                    batchCounter = 0;
                }
            }
 
            if(!exactBatches) {
                vectorSchemaRoot.setRowCount(batchCounter);
                writer.writeBatch();
            }
 
            writer.end();
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }
 
    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }
 
}

To show support for batching on Arrow, a simple batching algorithm has been implemented in the function. For our example, just consider writing the data in batches.

Let's take a closer look at what the above code does:

Vector allocator creation:

public DefaultEntriesToBytesConverter() {
    rootAllocator = new RootAllocator();
    vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
}

Then when writing to the stream, an Arrow stream writer is implemented and started

ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out));
writer.start();

We fill the vectors with data and then reset them as well, but leaving the preallocated buffers in place:

VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
childVector1.reset();
childVector2.reset();

When writing data, we use the setSafe operation. This should be done if more buffers need to be allocated. For this example this is done on every write, but can be avoided after taking into account the required operations and buffer sizes:

childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1()));
childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

Then, write the batch to the stream:

vectorSchemaRoot.setRowCount(batchSize);
writer.writeBatch();

Last but not least, we close the writer:

@Override
public void close() throws IOException {
    vectorSchemaRoot.close();
    rootAllocator.close();
}

The above is the detailed content of the introduction and architecture of Apache Arrow, a high-performance data format library package on JVM (Gkatziouras). For more information about getting started with Apache Arrow, please pay attention to other related articles on 123WORDPRESS.COM!

You may also be interested in:
  • JVM Introduction: Class Loading and Bytecode Technology (Class Loading and Class Loader)
  • JVM Introduction: Memory Structure (Heap, Method Area)
  • Getting Started with JVM - Overview of JVM

<<:  Html long text automatically cuts off when it exceeds the tag width

>>:  Two examples of using icons in Vue3

Recommend

Detailed explanation of adding dotted lines to Vue element tree controls

Table of contents 1. Achieve results 2. Implement...

Linux server quick uninstall and install node environment (easy to get started)

1. Uninstall npm first sudo npm uninstall npm -g ...

Summary of examples of common methods of JavaScript arrays

Table of contents Common array methods concat() M...

The most comprehensive explanation of the locking mechanism in MySQL

Table of contents Preface Global Lock Full databa...

Mysql Sql statement comments

You can add comments to MySQL SQL statements. Her...

Tutorial on installing Odoo14 from source code on Ubuntu 18.04

Table of contents Background of this series Overv...

Three ways to implement text color gradient in CSS

In the process of web front-end development, UI d...

How to use ss command instead of netstat in Linux operation and maintenance

Preface When operating and managing Linux servers...

Solution to the ineffective global style of the mini program custom component

Table of contents Too long to read Component styl...

Use h1, h2, and h3 tags appropriately

In the process of making web pages, it is inevita...

Summary of the benefits of deploying MySQL delayed slaves

Preface The master-slave replication relationship...

Instructions for nested use of MySQL ifnull

Nested use of MySQL ifnull I searched online to s...