Apache Arrow is a popular format used by various big data tools, including BigQuery, and is a storage format for both flat and hierarchical data. It is a memory intensive way to speed up applications. A commonly used library in the field of data processing and data science: Apache Arrow. Arrow is used by open source projects such as Apache Parquet, Apache Spark, pandas, and many commercial or closed source services. It provides the following features:
Let's take a look at how things worked before Arrow came along: We can see that in order for Spark to read data from a Parquet file, we need to read and deserialize the data in Parquet format. This requires us to make a complete copy of the data by loading it into memory. First, we read the data into an in-memory buffer and then use Parquet’s conversion methods to convert the data (e.g., a string or a number) into a representation of our programming language. This is necessary because Parquet represents numbers differently than the Python programming language. This is a big problem for performance for many reasons:
Now, let’s look at how Apache Arrow improves this: Instead of copying and transforming data, Arrow understands how to read and manipulate data directly. To this end, the Arrow community defined a new file format and operations that work directly on the serialized data. This data format can be read directly from disk without having to load it into memory and convert/deserialize the data. Of course, part of the data will still be loaded into RAM, but your data doesn't have to fit into memory. Arrow uses its file memory mapping capabilities to load as much data into memory only as necessary and possible. Apache Arrow supports the following languages:
Arrow FeaturesArrow is first and foremost a library that provides columnar data structures for in-memory computing. Any data can be decompressed and decoded into Arrow columnar data structures so that in-memory analysis can then be performed on the decoded data. The Arrow column format has some nice properties: random access is O(1), and each value cell is adjacent to the previous and next in memory, so iteration is very efficient. Apache Arrow defines a binary "serialization" protocol for arranging collections of Arrow column arrays (called "record batches") that can be used for messaging and inter-process communication. You can put the protocol anywhere, including on disk, and later it can be memory-mapped or read into memory and sent somewhere else. The Arrow protocol is designed so that you can "map" a block of Arrow data without any deserialization, so performing analytics on Arrow protocol data on disk can use memory mapping and effectively pay zero cost. This protocol is used for many things, such as streaming data between Spark SQL and Python, for running pandas functions against chunks of Spark SQL data, these are called "pandas udfs". Arrow is designed for memory (but you can put it on disk and then memory map it). They are designed to be compatible with each other and used together in applications, whereas their competitor, Apache Parquet files, are designed for disk storage. Pros: Apache Arrow defines a language-independent columnar storage format for flat and hierarchical data, organized for efficient analytical operations on modern hardware such as CPUs and GPUs. The Arrow memory format also supports zero-copy reads for lightning-fast data access without serialization overhead. Apache Arrow for JavaImport the library: <dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-memory-netty</artifactId> <version>${arrow.version}</version> </dependency> <dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-vector</artifactId> <version>${arrow.version}</version> </dependency> Before we begin, it is important to understand that for Arrow read/write operations, byte buffers are used. Operations such as reading and writing are continuous exchanges of bytes. To improve efficiency, Arrow comes with a buffer allocator that can be of a fixed size or with automatic expansion capabilities. Libraries that support allocation management are arrow-memory-netty and arrow-memory-unsafe. We use netty here. Storing data with Arrow requires a schema, which can be defined programmatically: package com.gkatzioura.arrow; import java.io.IOException; import java.util.List; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; public class SchemaFactory { public static Schema DEFAULT_SCHEMA = createDefault(); public static Schema createDefault() { var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null); var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null); return new Schema(List.of(strField, intField)); } public static Schema schemaWithChildren() { var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null); var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null); var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency)); return new Schema(List.of(itemField)); } public static SchemafromJson(String jsonString) { try { return Schema.fromJSON(jsonString); } catch (IOException e) { throw new ArrowExampleException(e); } } } They also have a parsable json representation: { "fields" : [ { "name" : "col1", "nullable" : true, "type" : { "name" : "utf8" }, "children" : [ ] }, { "name" : "col2", "nullable" : true, "type" : { "name" : "int", "bitWidth" : 32, "isSigned" : true }, "children" : [ ] } ] } Additionally, just like Avro, you can design complex schemas and embedded values on fields: public static Schema schemaWithChildren() { var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null); var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null); var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency)); return new Schema(List.of(itemField)); } Based on the Schema above, we will create a DTO for our class: package com.gkatzioura.arrow; import lombok.Builder; import lombok.Data; @Data @Builder public class DefaultArrowEntry { private String col1; private Integer col2; } Our goal is to convert these Java objects into Arrow byte streams. 1. Create a DirectByteBuffer using an allocator These buffers are off-heap. You do need to free the memory used, but for the library user this is done by performing a close() operation on the allocator. In our case, our class will implement the Closeable interface, which will perform the allocator closing operation. By using the streaming api, the data will be streamed to an OutPutStream submitted using the Arrow format: package com.gkatzioura.arrow; import java.io.Closeable; import java.io.IOException; import java.nio.channels.WritableByteChannel; import java.util.List; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.util.Text; import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA; public class DefaultEntriesWriter implements Closeable { private final RootAllocator rootAllocator; private final VectorSchemaRoot vectorSchemaRoot; //Vector allocator creation: public DefaultEntriesWriter() { rootAllocator = new RootAllocator(); vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator); } public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) { if (batchSize <= 0) { batchSize = defaultArrowEntries.size(); } DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider(); try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) { writer.start(); VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0); IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1); childVector1.reset(); childVector2.reset(); boolean exactBatches = defaultArrowEntries.size()%batchSize == 0; int batchCounter = 0; for(int i=0; i < defaultArrowEntries.size(); i++) { childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1())); childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2()); batchCounter++; if(batchCounter == batchSize) { vectorSchemaRoot.setRowCount(batchSize); writer.writeBatch(); batchCounter = 0; } } if(!exactBatches) { vectorSchemaRoot.setRowCount(batchCounter); writer.writeBatch(); } writer.end(); } catch (IOException e) { throw new ArrowExampleException(e); } } @Override public void close() throws IOException { vectorSchemaRoot.close(); rootAllocator.close(); } } To show support for batching on Arrow, a simple batching algorithm has been implemented in the function. For our example, just consider writing the data in batches. Let's take a closer look at what the above code does: Vector allocator creation: public DefaultEntriesToBytesConverter() { rootAllocator = new RootAllocator(); vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator); } Then when writing to the stream, an Arrow stream writer is implemented and started ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out)); writer.start(); We fill the vectors with data and then reset them as well, but leaving the preallocated buffers in place: VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0); IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1); childVector1.reset(); childVector2.reset(); When writing data, we use the setSafe operation. This should be done if more buffers need to be allocated. For this example this is done on every write, but can be avoided after taking into account the required operations and buffer sizes: childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1())); childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2()); Then, write the batch to the stream: vectorSchemaRoot.setRowCount(batchSize); writer.writeBatch(); Last but not least, we close the writer: @Override public void close() throws IOException { vectorSchemaRoot.close(); rootAllocator.close(); } The above is the detailed content of the introduction and architecture of Apache Arrow, a high-performance data format library package on JVM (Gkatziouras). For more information about getting started with Apache Arrow, please pay attention to other related articles on 123WORDPRESS.COM! You may also be interested in:
|
<<: Html long text automatically cuts off when it exceeds the tag width
>>: Two examples of using icons in Vue3
background In data warehouse modeling, the origin...
Table of contents 1. Achieve results 2. Implement...
1. Uninstall npm first sudo npm uninstall npm -g ...
Table of contents Common array methods concat() M...
Table of contents Preface Global Lock Full databa...
You can add comments to MySQL SQL statements. Her...
Table of contents Background of this series Overv...
In the process of web front-end development, UI d...
Preface When operating and managing Linux servers...
To achieve an effect similar to Windows forms, dr...
Preface When it comes to database transactions, a...
Table of contents Too long to read Component styl...
In the process of making web pages, it is inevita...
Preface The master-slave replication relationship...
Nested use of MySQL ifnull I searched online to s...