Contents
  1. 1. ValueVector
  2. 2. Field
  3. 3. Schema
  4. 4. VectorSchemaRoot
  5. 5. IPC
    1. 5.1. Write
    2. 5.2. Read
  6. 6. Table (experimental)
  7. 7. Dataset
  8. 8. substrait/Acero
  • 其他数据操作
  • 相关概念包括ValueVectorFieldSchemaVectorSchemaRoot以及Table

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    <dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>arrow-memory-netty</artifactId>
    <version>${arrow.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>arrow-vector</artifactId>
    <version>${arrow.version}</version>
    </dependency>

    ValueVector

    ValueVector代表一列相同类型的值,每个ValueVector实例代表一个字段,其中包含了该字段的所有值。Apache Arrow提供了各种各样的ValueVector的子类,用来表示各种类型的数据,比如IntVector用于表示整数,VarCharVector用于表示字符串等。类似的,还有BigIntVector、Float4Vector、Float8Vector、DateDayVector、ListVector、MapVector、StructVector等等

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    BufferAllocator allocator = new RootAllocator();
    IntVector ageVector = new IntVector("age", allocator);
    VarCharVector nameVector = new VarCharVector("name", allocator);

    ageVector.allocateNew(3);
    ageVector.set(0,1);
    ageVector.setNull(1);
    ageVector.set(2,2);
    ageVector.setValueCount(3);
    System.out.println("Vector created in memory: " + ageVector);

    Field

    Field表示某一列的元数据,包括列名、列类型、是否允许为null,以及一个元数据映射。每个Field对象都与一个ValueVector对象对应,Field对象描述了ValueVector的元数据信息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Field age = new Field("age",
    FieldType.nullable(new ArrowType.Int(32, true)),
    /*children*/null
    );
    Field name = new Field("name",
    FieldType.nullable(new ArrowType.Utf8()),
    /*children*/null
    );
    System.out.println("Field created: " + name + ", Metadata: " + name.getMetadata());

    Schema

    Schema是一系列Field的组合,它描述了表格的结构,也可以包含一个元数据映射。

    1
    2
    3
    4
    5
    Map<String, String> metadata = new HashMap<>();
    metadata.put("K1", "V1");
    metadata.put("K2", "V2");
    Schema schema = new Schema(asList(age, name), /*metadata*/ null);
    System.out.println("Schema created: " + schema);

    VectorSchemaRoot

    VectorSchemaRoot是由ValueVectorsSchema组合的关键抽象,它可以表示完整的表格数据。你可以理解为行存储中的List<Record>

    下面是一个创建VectorSchemaRoot的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    try(
    BufferAllocator allocator = new RootAllocator();
    VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
    IntVector ageVector = (IntVector) root.getVector("age");
    VarCharVector nameVector = (VarCharVector) root.getVector("name");
    ){
    ageVector.allocateNew(3);
    ageVector.set(0, 10);
    ageVector.set(1, 20);
    ageVector.set(2, 30);
    nameVector.allocateNew(3);
    nameVector.set(0, "Dave".getBytes(StandardCharsets.UTF_8));
    nameVector.set(1, "Peter".getBytes(StandardCharsets.UTF_8));
    nameVector.set(2, "Mary".getBytes(StandardCharsets.UTF_8));
    root.setRowCount(3);
    System.out.println("VectorSchemaRoot created: \n" + root.contentToTSVString());
    }

    // 已经创建vector的可以这样创建
    List<Field> fields = Arrays.asList(ageVector.getField(), nameVector.getField());
    List<FieldVector> vectors = Arrays.asList(ageVector, nameVector);
    VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);


    输出:

    1
    2
    3
    4
    5
    VectorSchemaRoot created:
    age name
    10 Dave
    20 Peter
    30 Mary

    IPC

    Write

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    File file = new File("random_access_file.arrow");
    try (
    FileOutputStream fileOutputStream = new FileOutputStream(file);
    ArrowFileWriter writer = new ArrowFileWriter(root, /*provider*/ null, fileOutputStream.getChannel());
    ) {
    writer.start();
    writer.writeBatch();
    writer.end();
    System.out.println("Record batches written: " + writer.getRecordBlocks().size()
    + ". Number of rows written: " + root.getRowCount());
    } catch (IOException e) {
    e.printStackTrace();
    }

    Read

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import org.apache.arrow.memory.BufferAllocator;
    import org.apache.arrow.memory.RootAllocator;
    import org.apache.arrow.vector.ipc.ArrowFileReader;
    import org.apache.arrow.vector.ipc.message.ArrowBlock;
    import org.apache.arrow.vector.VectorSchemaRoot;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;

    try(
    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
    FileInputStream fileInputStream = new FileInputStream(new File("random_access_file.arrow"));
    ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), allocator);
    ){
    System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
    for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
    reader.loadRecordBatch(arrowBlock);
    VectorSchemaRoot root = reader.getVectorSchemaRoot();
    System.out.println("VectorSchemaRoot read: \n" + root.contentToTSVString());
    }
    } catch (IOException e) {
    e.printStackTrace();
    }

    Table (experimental)

    就像Immutable且不支持批处理的VectorSchemaRoot,可以通过API将VectorSchemaRoot的数据转移到一个Table中(注意是转移而非复制)

    1
    2
    Table t = new Table(someVectorSchemaRoot);
    VectorSchemaRoot root = myTable.toVectorSchemaRoot();

    Table API 提供了一种以行为中心,基于列的方式处理内存中的大规模数据的高效方式。当你需要在 JVM 环境中处理大规模数据,并且希望能够高效地利用现代硬件的能力时,Table API 是一个非常好的选择。如果有必要(项目用到),后面可能单开一文总结下。

    Dataset

    Dataset is an universal layer in Apache Arrow for querying data in different formats or in different partitioning strategies.

    Currently supported file formats are:

    • Apache Arrow (.arrow)
    • Apache ORC (.orc)
    • Apache Parquet (.parquet)
    • Comma-Separated Values (.csv)
    • Line-delimited JSON Values (.json)
    1
    2
    3
    4
    5
    6
    <dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>arrow-dataset</artifactId>
    <version>15.0.0</version>
    </dependency>

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    package arrow;  

    import org.apache.arrow.dataset.file.*;
    import org.apache.arrow.dataset.jni.DirectReservationListener;
    import org.apache.arrow.dataset.jni.NativeMemoryPool;
    import org.apache.arrow.dataset.scanner.*;
    import org.apache.arrow.dataset.scanner.Scanner;
    import org.apache.arrow.dataset.source.*;
    import org.apache.arrow.memory.*;
    import org.apache.arrow.util.AutoCloseables;
    import org.apache.arrow.vector.*;
    import org.apache.arrow.vector.ipc.ArrowReader;
    import org.apache.arrow.vector.ipc.message.*;
    import org.apache.arrow.vector.types.pojo.Schema;

    import java.io.File;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.*;

    public class DatasetRead {
    public static void main(String[] args) throws URISyntaxException {
    // read data from file /opt/example.parquet
    // String uri = "file:///C:/Users/Manhua/IdeaProjects/spark-test/0__70006c85_2644_4f02_894b_0cfbffba0541.orc";
    String uri = "file:///C:/Users/Manhua/IdeaProjects/spark-test/1.csv";
    System.out.println(new File(new URI(uri)).length());
    // ScanOptions options = new ScanOptions(/*batchSize*/ 1, Optional.of(new String[] {"id", "user_id"}));
    ScanOptions options = new ScanOptions(/*batchSize*/ 1);
    try (
    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(
    allocator, NativeMemoryPool.getDefault(),
    FileFormat.CSV, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
    ) {
    Schema schema = scanner.schema();
    // Schema schema = datasetFactory.inspect();
    System.out.println(schema.toJson());

    // List<ArrowRecordBatch> batches = new ArrayList<>();
    while (reader.loadNextBatch()) {
    try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
    System.out.print(root.contentToTSVString());
    }
    }

    // do something with read record batches, for example:
    // analyzeArrowData(batches);

    // finished the analysis of the data, close all resources:
    // AutoCloseables.close(batches);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    substrait/Acero

    使用SQL方式处理数据

    其他数据操作

    Data manipulation — Apache Arrow Java Cookbook documentation