arrow系列6 --- arrow读写文件(Arrow File I/O)
作者:yunjinqi   类别:    日期:2023-10-14 14:47:39    阅读:122 次   消耗积分:0 分    

Apache Arrow提供了文件I/O功能,以便从应用程序的开始到结束使用Arrow。在本文中,您将学习:

  1. 将Arrow文件读入RecordBatch,然后再将其写出

  2. 将CSV文件读入Table,然后再将其写出

  3. 将Parquet文件读入Table,然后再将其写出

先决条件 

在继续之前,请确保您具备以下条件:

  1. 安装了Arrow,您可以在此处设置:在您自己的项目中使用Arrow C++

  2. 了解来自基本Arrow数据结构的基本Arrow数据结构

  3. 拥有一个用于运行最终应用程序的目录 - 该程序将生成一些文件,因此请做好准备。

设置 

在进行文件I/O之前,我们需要填补一些空白:

  • 我们需要包含必要的头文件。

  • 需要一个主函数(main())来将所有内容粘合在一起。

  • 我们需要一些文件来进行操作。

包含 在编写C++代码之前,我们需要一些包含文件。我们将包含输出所需的iostream,然后为本文中将使用的每种文件类型导入Arrow的I/O功能:

#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>

#include <iostream>

主函数(main()) 

作为粘合剂,我们将使用前一篇关于数据结构的教程中的main()模式:

int main() {
  arrow::Status st = RunMain();
  if (!st.ok()) {
    std::cerr << st << std::endl;
    return 1;
  }
  return 0;
}

和之前一样,这将与RunMain()一起使用:

arrow::Status RunMain() {
    return arrow::Status::OK()
}

生成用于读取的文件 

我们需要一些文件来进行实际操作。在实践中,您可能会有自己应用程序的输入。然而,在这里,我们只是为了探索I/O而生成一些文件,以便更容易跟踪。

为了创建这些文件,我们将定义一个帮助函数,首先运行它。随时阅读此内容,但稍后的文章中将解释使用的概念。请注意,我们使用了先前教程中的日期/月份/年份数据。现在,只需复制该函数:

(以下是生成文件的帮助函数,可以稍后使用)

arrow::Status GenInitialFile() {
  // 创建一些8位整数数组和一个16位整数数组 - 就像
  // 基本的Arrow示例一样。
  arrow::Int8Builder int8builder;
  int8_t days_raw[5] = {1, 12, 17, 23, 28};
  ARROW_RETURN_NOT_OK(int8builder.AppendValues(days_raw, 5));
  std::shared_ptr<arrow::Array> days;
  ARROW_ASSIGN_OR_RAISE(days, int8builder.Finish());

  int8_t months_raw[5] = {1, 3, 5, 7, 1};
  ARROW_RETURN_NOT_OK(int8builder.AppendValues(months_raw, 5));
  std::shared_ptr<arrow::Array> months;
  ARROW_ASSIGN_OR_RAISE(months, int8builder.Finish());

  arrow::Int16Builder int16builder;
  int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
  ARROW_RETURN_NOT_OK(int16builder.AppendValues(years_raw, 5));
  std::shared_ptr<arrow::Array> years;
  ARROW_ASSIGN_OR_RAISE(years, int16builder.Finish());

  // 获取包含我们数组的向量
  std::vector<std::shared_ptr<arrow::Array>> columns = {days, months, years};

  // 创建用于初始化Table的模式
  std::shared_ptr<arrow::Field> field_day, field_month, field_year;
  std::shared_ptr<arrow::Schema> schema;

  field_day = arrow::field("Day", arrow::int8());
  field_month = arrow::field("Month", arrow::int8());
  field_year = arrow::field("Year", arrow::int16());

  schema = arrow::schema({field_day, field_month, field_year});
  // 使用模式和数据,创建Table
  std::shared_ptr<arrow::Table> table;
  table = arrow::Table::Make(schema, columns);

  // 将示例文件写入IPC、CSV和Parquet以供使用。
  std::shared_ptr<arrow::io::FileOutputStream> outfile;
  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.arrow"));
  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::ipc::RecordBatchWriter> ipc_writer,
                        arrow::ipc::MakeFileWriter(outfile, schema));
  ARROW_RETURN_NOT_OK(ipc_writer->WriteTable(*table));
  ARROW_RETURN_NOT_OK(ipc_writer->Close());

  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.csv"));
  ARROW_ASSIGN_OR_RAISE(auto csv_writer,
                        arrow::csv::MakeCSVWriter(outfile, table->schema()));
  ARROW_RETURN_NOT_OK(csv_writer->WriteTable(*table));
  ARROW_RETURN_NOT_OK(csv_writer->Close());

  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.parquet"));
  PARQUET_THROW_NOT_OK(
      parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 5));

  return arrow::Status::OK();
}

为了获取其余代码所需的文件,确保在RunMain()的第一行调用GenInitialFile()以初始化环境:

// 生成每种格式的初始文件的帮助函数 - 别担心,
// 我们也将在本示例中写入一个表。
ARROW_RETURN_NOT_OK(GenInitialFile());

Arrow文件I/O 

我们将逐步进行读取然后写入,如下所示:

读取文件 

  1. 打开文件 

  2. 将文件绑定到ipc::RecordBatchFileReader 

  3. 从文件中读取到RecordBatch

写文件 

  1. 获取io::FileOutputStream 

  2. 从RecordBatch写入文件

打开文件 要实际读取文件,我们需要获取某种指向文件的方式。在Arrow中,这意味着我们将获得一个io::ReadableFile对象 - 就像ArrayBuilder可以清除并创建新数组一样,我们可以将其重新分配给新文件,因此我们将在示例中使用此实例:

// 首先,我们必须设置一个ReadableFile对象,它只是让我们指向磁盘上正确数据的读取器。我们将重用这个对象,并将其重新绑定到多个文件中。
std::shared_ptr<arrow::io::ReadableFile> infile;

单独使用io::ReadableFile并不多 - 我们实际上将它绑定到文件并使用io::ReadableFile::Open()。对于我们的目的,使用默认参数就足够了:

// 获取"test_in.arrow"到我们的文件指针
ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(
                            "test_in.arrow", arrow::default_memory_pool()));

打开Arrow文件阅读器 

io::ReadableFile太通用,无法提供读取Arrow文件所需的所有功能。我们需要使用它来获取ipc::RecordBatchFileReader对象。此对象实现了正确格式的Arrow文件所需的所有逻辑。我们可以通过ipc::RecordBatchFileReader::Open()来获取一个

// 使用该读取器,我们可以读取Record Batches。请注意,这仅适用于IPC;对于其他格式,我们专注于Table,但在这里,使用RecordBatches。
std::shared_ptr<arrow::ipc::RecordBatchFileReader> ipc_reader;
ARROW_ASSIGN_OR_RAISE(ipc_reader, arrow::ipc::RecordBatchFileReader::Open(infile));

从已打开的Arrow文件读取到RecordBatch 要读取Arrow文件,我们必须使用RecordBatch,因此我们将获取一个RecordBatch。一旦获取了它,我们就可以实际读取文件。Arrow文件可以包含多个RecordBatches,因此我们必须传递一个索引。此文件只有一个RecordBatch,因此传递0:

// 使用读取器,我们可以读取Record Batches。请注意,这仅适用于IPC;对于其他格式,我们专注于Table,但在这里,使用RecordBatches。
std::shared_ptr<arrow::RecordBatch> rbatch;
ARROW_ASSIGN_OR_RAISE(rbatch, ipc_reader->ReadRecordBatch(0));

准备一个FileOutputStream 

对于输出,我们需要一个io::FileOutputStream。与我们的io::ReadableFile一样,我们将重复使用它,所以做好准备。打开文件的方式与读取时相同:

// 就像输入一样,我们获取一个输出文件对象。
std::shared_ptr<arrow::io::FileOutputStream> outfile;
// 将其绑定到 "test_out.arrow"
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.arrow"));

从RecordBatch中写Arrow文件 现在,我们获取了先前读入的RecordBatch,并将其与目标文件一起使用,以创建一个ipc::RecordBatchWriter。

ipc::RecordBatchWriter需要两个东西:

  1. 目标文件

  2. 我们的RecordBatch的Schema(以防需要写入相同格式的多个RecordBatches)

Schema来自我们现有的RecordBatch,目标文件是我们刚刚创建的输出流。

// 使用输出文件和Schema来设置写入器!我们在这里定义了一切,准备开始。
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::ipc::RecordBatchWriter> ipc_writer,
                        arrow::ipc::MakeFileWriter(outfile, rbatch->schema()));

我们可以直接调用ipc::RecordBatchWriter::WriteRecordBatch()来填充我们的文件:

// 写入RecordBatch。
ARROW_RETURN_NOT_OK(ipc_writer->WriteRecordBatch(*rbatch));

特别是对于IPC,写入器必须显式关闭,因为它预计可能会写入多个批次。要这样做:

// 特别是对于IPC,写入器需要明确关闭。
ARROW_RETURN_NOT_OK(ipc_writer->Close());

现在,我们已经读取并写入了IPC文件!

CSV文件I/O 

我们将逐步进行这些操作,首先是读取,然后是写入,如下所示:

读取文件

  1. 打开文件

  2. 准备Table

  3. 使用csv::TableReader读取文件

写入文件

  1. 获取io::FileOutputStream

  2. 从Table写入文件

打开CSV文件 

对于CSV文件,我们需要打开一个io::ReadableFile,就像对待Arrow文件一样,并重复使用之前的io::ReadableFile对象来完成:

// 将输入文件绑定到 "test_in.csv"
ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open("test_in.csv"));

准备Table CSV可以读取到一个Table,因此声明一个指向Table的指针:

std::shared_ptr<arrow::Table> csv_table;

读取CSV文件到Table 

CSV读取器具有需要传递的选项结构 - 幸运的是,对于这些选项,我们可以直接使用默认值。有关其他选项的参考,请转到这里:文件格式。由于没有特殊的分隔符,而且文件很小,因此我们可以使用默认值创建读取器:

// CSV读取器有用于各种选项的几个对象。现在,我们将使用默认值。
ARROW_ASSIGN_OR_RAISE(
      auto csv_reader,
      arrow::csv::TableReader::Make(
          arrow::io::default_io_context(), infile, arrow::csv::ReadOptions::Defaults(),
          arrow::csv::ParseOptions::Defaults(), arrow::csv::ConvertOptions::Defaults()));

有了CSV读取器,我们可以使用它的csv::TableReader::Read()方法来填充我们的Table:

// 读取Table。
ARROW_ASSIGN_OR_RAISE(csv_table, csv_reader->Read())

从Table写入CSV文件 

将CSV写入Table的操作与将IPC写入RecordBatch的操作几乎相同,只是使用我们的Table,并使用ipc::RecordBatchWriter::WriteTable(),而不是ipc::RecordBatchWriter::WriteRecordBatch()。请注意,我们使用相同的写入器类 - 我们之所以使用ipc::RecordBatchWriter::WriteTable()是因为我们有一个Table。我们将定位到一个文件,使用我们Table的Schema,然后写入Table:

// 将输出文件绑定到 "test_out.csv"
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.csv"));
// CSV写入器具有更简单的默认值,如果需要更复杂的用法,请查看API文档。
ARROW_ASSIGN_OR_RAISE(auto csv_writer,
                        arrow::csv::MakeCSVWriter(outfile, csv_table->schema()));
ARROW_RETURN_NOT_OK(csv_writer->WriteTable(*csv_table));
// 不是必需的,但是一种安全的做法。
ARROW_RETURN_NOT_OK(csv_writer->Close());

现在,我们已经读取并写入了CSV文件!

使用Parquet进行文件I/O 

我们将逐步进行这些操作,首先是读取,然后是写入,如下所示:

读取文件

  1. 打开文件

  2. 准备parquet::arrow::FileReader

  3. 读取文件到Table

写入文件

  • 将Table写入文件

打开Parquet文件 再次,这种文件格式,Parquet,需要一个io::ReadableFile,我们已经有了,需要在文件上调用io::ReadableFile::Open()方法:

// 将输入文件绑定到 "test_in.parquet"
ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open("test_in.parquet"));

设置Parquet读取器 

像以往一样,我们需要一个读取器来实际读取文件。我们一直在从Arrow命名空间中获取各种文件格式的读取器。这一次,我们进入Parquet命名空间以获取parquet::arrow::FileReader:

std::unique_ptr<parquet::arrow::FileReader> reader;

现在,为了设置我们的读取器,我们调用parquet::arrow::OpenFile()。是的,即使我们使用了io::ReadableFile::Open(),这仍是必要的。请注意,我们通过引用传递我们的parquet::arrow::FileReader,而不是将其分配给输出:

// 请注意,Parquet的OpenFile()方法通过引用接受读取器,而不是返回读取器。
PARQUET_THROW_NOT_OK(
    parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));

从Parquet文件读取到Table 

有了准备好的parquet::arrow::FileReader,我们可以将数据读取到Table中,除非我们必须通过引用传递Table而不是将其分配给输出:

std::shared_ptr<arrow::Table> parquet_table;
// 读取Table。
PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table));

从Table写入Parquet文件 

对于一次性写入,写入Parquet文件不需要编写器对象。相反,我们将提供Table,指向其必要内存消耗的内存池,告诉它在哪里写入以及如果需要在文件中拆分数据的块大小:

// 写入Parquet不需要声明编写器对象。只需获取输出文件绑定,然后传递Table,内存池,输出和块大小以在磁盘上拆分Table。
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.parquet"));
PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(
  *parquet_table, arrow::default_memory_pool(), outfile, 5));

结束程序 最后,我们只需返回Status::OK(),以便main()函数知道我们已经完成,一切都正常,就像在第一个教程中一样:

return arrow::Status::OK();

通过这种方式,您已经学会了在Arrow中读取和写入IPC、CSV和Parquet文件,可以正确加载数据并进行输出!现在,我们可以继续学习下一篇文章中的计算函数来处理数据。


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/339
上一篇:arrow系列5---arrow的基本数据结构(Basic Arrow Data Structures)
下一篇:arrow系列7 ---运行arrow 读写操作的例子