相关概念包括ValueVector
、Field
、Schema
、VectorSchemaRoot
以及Table
1 2 3 4 5 6 7 8 9 10 11 <dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-memory-netty</artifactId> <version>${arrow.version}</version> </dependency> <dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-vector</artifactId> <version>${arrow.version}</version> </dependency>
ValueVector ValueVector 代表一列相同类型的值,每个ValueVector
实例代表一个字段,其中包含了该字段的所有值。Apache Arrow提供了各种各样的ValueVector
的子类,用来表示各种类型的数据,比如IntVector
用于表示整数,VarCharVector
用于表示字符串等。类似的,还有BigIntVector、Float4Vector、Float8Vector、DateDayVector、ListVector、MapVector、StructVector
等等
1 2 3 4 5 6 7 8 9 10 BufferAllocator allocator = new RootAllocator(); IntVector ageVector = new IntVector("age", allocator); VarCharVector nameVector = new VarCharVector("name", allocator); ageVector.allocateNew(3); ageVector.set(0,1); ageVector.setNull(1); ageVector.set(2,2); ageVector.setValueCount(3); System.out.println("Vector created in memory: " + ageVector);
Field Field 表示某一列的元数据,包括列名、列类型、是否允许为null,以及一个元数据映射。每个Field
对象都与一个ValueVector
对象对应,Field
对象描述了ValueVector
的元数据信息。
1 2 3 4 5 6 7 8 9 Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), /*children*/null ); Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), /*children*/null ); System.out.println("Field created: " + name + ", Metadata: " + name.getMetadata());
Schema Schema 是一系列Field
的组合,它描述了表格的结构,也可以包含一个元数据映射。
1 2 3 4 5 Map<String, String> metadata = new HashMap<>(); metadata.put("K1", "V1"); metadata.put("K2", "V2"); Schema schema = new Schema(asList(age, name), /*metadata*/ null); System.out.println("Schema created: " + schema);
VectorSchemaRoot VectorSchemaRoot 是由ValueVectors
和Schema
组合的关键抽象,它可以表示完整的表格数据。你可以理解为行存储中的List<Record>
。
下面是一个创建VectorSchemaRoot
的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 try( BufferAllocator allocator = new RootAllocator(); VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); IntVector ageVector = (IntVector) root.getVector("age"); VarCharVector nameVector = (VarCharVector) root.getVector("name"); ){ ageVector.allocateNew(3); ageVector.set(0, 10); ageVector.set(1, 20); ageVector.set(2, 30); nameVector.allocateNew(3); nameVector.set(0, "Dave".getBytes(StandardCharsets.UTF_8)); nameVector.set(1, "Peter".getBytes(StandardCharsets.UTF_8)); nameVector.set(2, "Mary".getBytes(StandardCharsets.UTF_8)); root.setRowCount(3); System.out.println("VectorSchemaRoot created: \n" + root.contentToTSVString()); } // 已经创建vector的可以这样创建 List<Field> fields = Arrays.asList(ageVector.getField(), nameVector.getField()); List<FieldVector> vectors = Arrays.asList(ageVector, nameVector); VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);
输出:
1 2 3 4 5 VectorSchemaRoot created: age name 10 Dave 20 Peter 30 Mary
IPC Write 1 2 3 4 5 6 7 8 9 10 11 12 13 File file = new File("random_access_file.arrow"); try ( FileOutputStream fileOutputStream = new FileOutputStream(file); ArrowFileWriter writer = new ArrowFileWriter(root, /*provider*/ null, fileOutputStream.getChannel()); ) { writer.start(); writer.writeBatch(); writer.end(); System.out.println("Record batches written: " + writer.getRecordBlocks().size() + ". Number of rows written: " + root.getRowCount()); } catch (IOException e) { e.printStackTrace(); }
Read 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.ipc.ArrowFileReader; import org.apache.arrow.vector.ipc.message.ArrowBlock; import org.apache.arrow.vector.VectorSchemaRoot; import java.io.File; import java.io.FileInputStream; import java.io.IOException; try( BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); FileInputStream fileInputStream = new FileInputStream(new File("random_access_file.arrow")); ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), allocator); ){ System.out.println("Record batches in file: " + reader.getRecordBlocks().size()); for (ArrowBlock arrowBlock : reader.getRecordBlocks()) { reader.loadRecordBatch(arrowBlock); VectorSchemaRoot root = reader.getVectorSchemaRoot(); System.out.println("VectorSchemaRoot read: \n" + root.contentToTSVString()); } } catch (IOException e) { e.printStackTrace(); }
Table (experimental) 就像Immutable且不支持批处理的VectorSchemaRoot,可以通过API将VectorSchemaRoot的数据转移到一个Table中(注意是转移而非复制)
1 2 Table t = new Table(someVectorSchemaRoot); VectorSchemaRoot root = myTable.toVectorSchemaRoot();
Table API 提供了一种以行为中心,基于列的方式处理内存中的大规模数据的高效方式。当你需要在 JVM 环境中处理大规模数据,并且希望能够高效地利用现代硬件的能力时,Table API 是一个非常好的选择。如果有必要(项目用到),后面可能单开一文总结下。
Dataset Dataset is an universal layer in Apache Arrow for querying data in different formats or in different partitioning strategies.
Currently supported file formats are:
Apache Arrow (.arrow
)
Apache ORC (.orc
)
Apache Parquet (.parquet
)
Comma-Separated Values (.csv
)
Line-delimited JSON Values (.json
)
1 2 3 4 5 6 <dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-dataset</artifactId> <version>15.0.0</version> </dependency>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package arrow; import org.apache.arrow.dataset.file.*; import org.apache.arrow.dataset.jni.DirectReservationListener; import org.apache.arrow.dataset.jni.NativeMemoryPool; import org.apache.arrow.dataset.scanner.*; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.dataset.source.*; import org.apache.arrow.memory.*; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.*; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.message.*; import org.apache.arrow.vector.types.pojo.Schema; import java.io.File; import java.net.URI; import java.net.URISyntaxException; import java.util.*; public class DatasetRead { public static void main (String[] args) throws URISyntaxException { String uri = "file:///C:/Users/Manhua/IdeaProjects/spark-test/1.csv" ; System.out.println(new File (new URI (uri)).length()); ScanOptions options = new ScanOptions ( 1 ); try ( BufferAllocator allocator = new RootAllocator (Long.MAX_VALUE); DatasetFactory datasetFactory = new FileSystemDatasetFactory ( allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, uri); Dataset dataset = datasetFactory.finish(); Scanner scanner = dataset.newScan(options); ArrowReader reader = scanner.scanBatches() ) { Schema schema = scanner.schema(); System.out.println(schema.toJson()); while (reader.loadNextBatch()) { try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) { System.out.print(root.contentToTSVString()); } } } catch (Exception e) { e.printStackTrace(); } } }
substrait/Acero 使用SQL方式处理数据
其他数据操作 Data manipulation — Apache Arrow Java Cookbook documentation